Skip to content

Commit

Permalink
3.3.0 remove a layer of indirection (unnecessary passthrough) (#49)
Browse files Browse the repository at this point in the history
we shouldnt use passthrough if we dont need it, we got wrecked by the
default watermark on the passthrough stream
  • Loading branch information
jackyzha0 authored Oct 24, 2024
1 parent 40b2568 commit b6f230b
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 41 deletions.
2 changes: 1 addition & 1 deletion npm/darwin-arm64/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-darwin-arm64",
"version": "3.2.4",
"version": "3.3.0",
"os": [
"darwin"
],
Expand Down
2 changes: 1 addition & 1 deletion npm/darwin-x64/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-darwin-x64",
"version": "3.2.4",
"version": "3.3.0",
"os": [
"darwin"
],
Expand Down
2 changes: 1 addition & 1 deletion npm/linux-x64-gnu/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-linux-x64-gnu",
"version": "3.2.4",
"version": "3.3.0",
"os": [
"linux"
],
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty",
"version": "3.2.4",
"version": "3.3.0",
"main": "dist/wrapper.js",
"types": "dist/wrapper.d.ts",
"author": "Szymon Kaliski <[email protected]>",
Expand Down
44 changes: 38 additions & 6 deletions tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ describe(
test('captures an exit code', () =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
new Pty({
const pty = new Pty({
command: '/bin/sh',
args: ['-c', 'exit 17'],
onExit: (err, exitCode) => {
Expand All @@ -86,6 +86,9 @@ describe(
done();
},
});

// set a pty reader so it can flow
pty.read.on('data', () => { });
}));

test('can be written to', () =>
Expand Down Expand Up @@ -272,6 +275,8 @@ describe(
}
},
});

pty.read.on('data', () => { });
}));

test('resize after close shouldn\'t throw', () => new Promise<void>((done, reject) => {
Expand All @@ -287,6 +292,8 @@ describe(
},
});

pty.read.on('data', () => { });

pty.close();
expect(() => {
pty.resize({ rows: 60, cols: 100 });
Expand All @@ -305,13 +312,13 @@ describe(
command: '/bin/sh',
args: [
'-c',
`for i in $(seq 0 ${n}); do /bin/echo $i; done && exit`,
'seq 0 1024'
],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer.toString().trim()).toBe(
[...Array(n + 1).keys()].join('\r\n'),
expect(buffer.toString().trim().split('\n').map(Number)).toStrictEqual(

Check failure on line 320 in tests/index.test.ts

View workflow job for this annotation

GitHub Actions / Build and test on x86_64-unknown-linux-gnu

Unhandled error

AssertionError: expected [ +0, 1, 2, 3, 4, 5, 6, 7, 8, …(832) ] to strictly equal [ +0, 1, 2, 3, 4, 5, 6, 7, 8, …(1016) ] - Expected + Received Array [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200, 201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400, 401, 402, 403, 404, 405, 406, 407, 408, 409, 410, 411, 412, 413, 414, 415, 416, 417, 418, 419, 420, 421, 422, 423, 424, 425, 426, 427, 428, 429, 430, 431, 432, 433, 434, 435, 436, 437, 438, 439, 440, 441, 442, 443, 444, 445, 446, 447, 448, 449,
Array.from({ length: n + 1 }, (_, i) => i),
);
expect(getOpenFds()).toStrictEqual(oldFds);
done();
Expand All @@ -325,9 +332,35 @@ describe(
}),
);

test('doesnt miss large output from fast commands',
() =>
new Promise<void>((done) => {
const payload = `hello`.repeat(4096);
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/echo',
args: [
'-n',
payload
],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
// account for the newline
expect(buffer.toString().length).toBe(payload.length);
done();
},
});

const readStream = pty.read;
readStream.on('data', (data) => {
buffer = Buffer.concat([buffer, data]);
});
})
);

testSkipOnDarwin(
'does not leak files',
{ repeats: 4 },
() =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
Expand Down Expand Up @@ -373,7 +406,6 @@ describe(

test(
'can run concurrent shells',
{ repeats: 4 },
() =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
Expand Down
70 changes: 41 additions & 29 deletions wrapper.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PassThrough, type Readable, type Writable } from 'node:stream';
import { type Readable, Writable } from 'node:stream';
import { ReadStream } from 'node:tty';
import {
Pty as RawPty,
Expand Down Expand Up @@ -45,21 +45,30 @@ type ExitResult = {
export class Pty {
#pty: RawPty;
#fd: number;
#fdEnded: boolean = false;

#handledClose: boolean = false;
#handledEndOfData: boolean = false;

#socket: ReadStream;
get read(): Readable {
return this.#socket;
}

read: Readable;
write: Writable;

constructor(options: PtyOptions) {
const realExit = options.onExit;

let resolve: (value: ExitResult) => void;
let exitResult: Promise<ExitResult> = new Promise((res) => {
resolve = res;
let markExited: (value: ExitResult) => void;
let exitResult: Promise<ExitResult> = new Promise((resolve) => {
markExited = resolve;
});
let markFdClosed: () => void;
let fdClosed = new Promise<void>((resolve) => {
markFdClosed = resolve;
});
const mockedExit = (error: NodeJS.ErrnoException | null, code: number) => {
resolve({ error, code });
markExited({ error, code });
};

// when pty exits, we should wait until the fd actually ends (end OR error)
Expand All @@ -70,27 +79,29 @@ export class Pty {
// Transfer ownership of the FD to us.
this.#fd = this.#pty.takeFd();

this.#socket = new ReadStream(this.#fd);
const userFacingRead = new PassThrough();
const userFacingWrite = new PassThrough();
this.#socket.pipe(userFacingRead);
userFacingWrite.pipe(this.#socket);
this.read = userFacingRead;
this.write = userFacingWrite;
this.#socket = new ReadStream(this.#fd)
this.write = new Writable({
write: this.#socket.write.bind(this.#socket),
});

// catch end events
const handleClose = () => {
if (this.#fdEnded) {
const handleEnd = async () => {
if (this.#handledEndOfData) {
return;
}

this.#fdEnded = true;
exitResult.then((result) => {
realExit(result.error, result.code)
});
userFacingRead.end();
};
this.#socket.on('close', handleClose);
this.#handledEndOfData = true;

// must wait for fd close and exit result before calling real exit
await fdClosed;
const result = await exitResult;
realExit(result.error, result.code)
}

this.read.on('end', handleEnd);
this.read.on('close', () => {
markFdClosed();
});

// PTYs signal their done-ness with an EIO error. we therefore need to filter them out (as well as
// cleaning up other spurious errors) so that the user doesn't need to handle them and be in
Expand All @@ -108,25 +119,26 @@ export class Pty {
// EIO only happens when the child dies. It is therefore our only true signal that there
// is nothing left to read and we can start tearing things down. If we hadn't received an
// error so far, we are considered to be in good standing.
this.#socket.off('error', handleError);
this.#socket.end();
this.read.off('error', handleError);
handleEnd();
return;
}
}

this.read.emit('error', err);
};
this.#socket.on('error', handleError);

this.read.on('error', handleError);
}

close() {
this.#handledClose = true;

// end instead of destroy so that the user can read the last bits of data
// and allow graceful close event to mark the fd as ended
this.#socket.end();
}

resize(size: Size) {
if (this.#fdEnded) {
if (this.#handledClose || this.#handledEndOfData) {
return;
}

Expand Down

0 comments on commit b6f230b

Please sign in to comment.