diff --git a/npm/darwin-arm64/package.json b/npm/darwin-arm64/package.json index 725d4b1..75de279 100644 --- a/npm/darwin-arm64/package.json +++ b/npm/darwin-arm64/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty-darwin-arm64", - "version": "3.3.0", + "version": "3.4.0", "os": [ "darwin" ], diff --git a/npm/darwin-x64/package.json b/npm/darwin-x64/package.json index 4cc36e7..886abe1 100644 --- a/npm/darwin-x64/package.json +++ b/npm/darwin-x64/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty-darwin-x64", - "version": "3.3.0", + "version": "3.4.0", "os": [ "darwin" ], diff --git a/npm/linux-x64-gnu/package.json b/npm/linux-x64-gnu/package.json index cf1560f..122c20d 100644 --- a/npm/linux-x64-gnu/package.json +++ b/npm/linux-x64-gnu/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty-linux-x64-gnu", - "version": "3.3.0", + "version": "3.4.0", "os": [ "linux" ], diff --git a/package-lock.json b/package-lock.json index a968647..6cea6de 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@replit/ruspty", - "version": "3.3.0", + "version": "3.4.0", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@replit/ruspty", - "version": "3.3.0", + "version": "3.4.0", "license": "MIT", "devDependencies": { "@napi-rs/cli": "^2.18.2", diff --git a/package.json b/package.json index 2f1b96c..124f422 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@replit/ruspty", - "version": "3.3.0", + "version": "3.4.0", "main": "dist/wrapper.js", "types": "dist/wrapper.d.ts", "author": "Szymon Kaliski ", @@ -36,7 +36,7 @@ }, "scripts": { "artifacts": "napi artifacts", - "build": "napi build --platform --release && npm run build:wrapper", + "build": "napi build --platform --release && npm run build:wrapper && npm run format", "build:wrapper": "tsup", "prepublishOnly": "napi prepublish -t npm", "test": "vitest run", diff --git a/src/lib.rs b/src/lib.rs index beb2ab2..59ca4ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,9 @@ use std::collections::HashMap; use std::fs::File; -use std::io::Error; use std::io::ErrorKind; -use std::io::Write; +use std::io::{Error, Write}; use std::os::fd::{AsRawFd, OwnedFd}; -use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd}; +use std::os::fd::{FromRawFd, IntoRawFd, RawFd}; use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::thread; @@ -12,14 +11,13 @@ use std::time::Duration; use backoff::backoff::Backoff; use backoff::ExponentialBackoffBuilder; -use libc::{self, c_int}; use napi::bindgen_prelude::JsFunction; use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; use napi::Status::GenericFailure; use napi::{self, Env}; use nix::errno::Errno; use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag}; -use nix::poll::{poll, PollFd, PollFlags, PollTimeout}; +use nix::libc::{self, c_int, ioctl, FIONREAD, TIOCOUTQ, TIOCSCTTY, TIOCSWINSZ}; use nix::pty::{openpty, Winsize}; use nix::sys::termios::{self, SetArg}; @@ -59,14 +57,11 @@ fn cast_to_napi_error(err: Errno) -> napi::Error { napi::Error::new(GenericFailure, err) } -// if the child process exits before the controller fd is fully read, we might accidentally -// end in a case where onExit is called but js hasn't had the chance to fully read the controller fd +// if the child process exits before the controller fd is fully read or the user fd is fully +// flushed, we might accidentally end in a case where onExit is called but js hasn't had +// the chance to fully read the controller fd // let's wait until the controller fd is fully read before we call onExit -fn poll_controller_fd_until_read(raw_fd: RawFd) { - // wait until fd is fully read (i.e. POLLIN no longer set) - let borrowed_fd = unsafe { BorrowedFd::borrow_raw(raw_fd) }; - let poll_fd = PollFd::new(borrowed_fd, PollFlags::POLLIN); - +fn poll_pty_fds_until_read(controller_fd: RawFd, user_fd: RawFd) { let mut backoff = ExponentialBackoffBuilder::default() .with_initial_interval(Duration::from_millis(1)) .with_max_interval(Duration::from_millis(100)) @@ -74,30 +69,42 @@ fn poll_controller_fd_until_read(raw_fd: RawFd) { .build(); loop { - if let Err(err) = poll(&mut [poll_fd], PollTimeout::ZERO) { - if err == Errno::EINTR || err == Errno::EAGAIN { - // we were interrupted, so we should just try again - continue; - } + // check both input and output queues for both FDs + let mut controller_inq: i32 = 0; + let mut controller_outq: i32 = 0; + let mut user_inq: i32 = 0; + let mut user_outq: i32 = 0; - // we should almost never hit this, but if we do, we should just break out of the loop. this - // can happen if Node destroys the terminal before waiting for the child process to go away. - break; - } + // safe because we're passing valid file descriptors and properly sized integers + unsafe { + // check bytes waiting to be read (FIONREAD, equivalent to TIOCINQ on Linux) + if ioctl(controller_fd, FIONREAD, &mut controller_inq) == -1 + || ioctl(user_fd, FIONREAD, &mut user_inq) == -1 + { + // break if we can't read + break; + } - // check if POLLIN is no longer set (i.e. there is no more data to read) - if let Some(flags) = poll_fd.revents() { - if !flags.contains(PollFlags::POLLIN) { + // check bytes waiting to be written (TIOCOUTQ) + if ioctl(controller_fd, TIOCOUTQ, &mut controller_outq) == -1 + || ioctl(user_fd, TIOCOUTQ, &mut user_outq) == -1 + { + // break if we can't read break; } } - // wait for a bit before trying again + // if all queues are empty, we're done + if controller_inq == 0 && controller_outq == 0 && user_inq == 0 && user_outq == 0 { + break; + } + + // apply backoff strategy if let Some(d) = backoff.next_backoff() { thread::sleep(d); continue; } else { - // we have exhausted our attempts, its joever + // we have exhausted our attempts break; } } @@ -181,7 +188,7 @@ impl Pty { } // become the controlling tty for the program - let err = libc::ioctl(raw_user_fd, libc::TIOCSCTTY.into(), 0); + let err = libc::ioctl(raw_user_fd, TIOCSCTTY.into(), 0); if err == -1 { return Err(Error::new(ErrorKind::Other, "ioctl-TIOCSCTTY")); } @@ -244,10 +251,7 @@ impl Pty { let wait_result = child.wait(); // try to wait for the controller fd to be fully read - poll_controller_fd_until_read(raw_controller_fd); - - // we don't drop the controller fd immediately - // let pty.close() be responsible for closing it + poll_pty_fds_until_read(raw_controller_fd, raw_user_fd); drop(user_fd); match wait_result { @@ -310,7 +314,7 @@ fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> { ws_ypixel: 0, }; - let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) }; + let res = unsafe { libc::ioctl(fd, TIOCSWINSZ, &window_size as *const _) }; if res == -1 { return Err(napi::Error::new( napi::Status::GenericFailure, diff --git a/tests/index.test.ts b/tests/index.test.ts index 3fde299..6ad01be 100644 --- a/tests/index.test.ts +++ b/tests/index.test.ts @@ -1,7 +1,7 @@ import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper'; import { type Writable } from 'stream'; import { readdirSync, readlinkSync } from 'fs'; -import { describe, test, expect, beforeEach, afterEach } from 'vitest'; +import { describe, test, expect, beforeEach, afterEach, vi, type Mock } from 'vitest'; import { exec as execAsync } from 'child_process'; import { promisify } from 'util'; const exec = promisify(execAsync); @@ -47,138 +47,123 @@ function getOpenFds(): FdRecord { describe( 'PTY', - { repeats: 50 }, + { repeats: 500 }, () => { - test('spawns and exits', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const message = 'hello from a pty'; - let buffer = ''; + test('spawns and exits', async () => { + const oldFds = getOpenFds(); + const message = 'hello from a pty'; + let buffer = ''; - const pty = new Pty({ - command: '/bin/echo', - args: [message], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.trim()).toBe(message); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const onExit = vi.fn(); + const pty = new Pty({ + command: '/bin/echo', + args: [message], + onExit, + }); - const readStream = pty.read; - readStream.on('data', (chunk) => { - buffer = chunk.toString(); - }); - })); + const readStream = pty.read; + readStream.on('data', (chunk) => { + buffer = chunk.toString(); + }); - test('captures an exit code', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'exit 17'], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(17); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.trim()).toBe(message); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - // set a pty reader so it can flow - pty.read.on('data', () => { }); - })); + test('captures an exit code', async () => { + const oldFds = getOpenFds(); + const onExit = vi.fn(); + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'exit 17'], + onExit, + }); - test('can be written to', () => - new Promise((done) => { - const oldFds = getOpenFds(); + // set a pty reader so it can flow + pty.read.on('data', () => {}); - // The message should end in newline so that the EOT can signal that the input has ended and not - // just the line. - const message = 'hello cat\n'; - let buffer = ''; + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 17); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - // We have local echo enabled, so we'll read the message twice. - const expectedResult = 'hello cat\r\nhello cat\r\n'; + test('can be written to', async () => { + const oldFds = getOpenFds(); + const message = 'hello cat\n'; + let buffer = ''; + const onExit = vi.fn(); - const pty = new Pty({ - command: '/bin/cat', - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - let result = buffer.toString(); - if (IS_DARWIN) { - // Darwin adds the visible EOT to the stream. - result = result.replace('^D\b\b', ''); - } - expect(result.trim()).toStrictEqual(expectedResult.trim()); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const pty = new Pty({ + command: '/bin/cat', + onExit, + }); - const writeStream = pty.write; - const readStream = pty.read; + const writeStream = pty.write; + const readStream = pty.read; - readStream.on('data', (data) => { - buffer += data.toString(); - }); - writeStream.write(message); - writeStream.end(EOT); - })); + readStream.on('data', (data) => { + buffer += data.toString(); + }); + + writeStream.write(message); + writeStream.end(EOT); + + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + + let result = buffer.toString(); + if (IS_DARWIN) { + // Darwin adds the visible EOT to the stream. + result = result.replace('^D\b\b', ''); + } - test('can be started in non-interactive fashion', () => - new Promise((done) => { - const oldFds = getOpenFds(); + const expectedResult = 'hello cat\r\nhello cat\r\n'; + expect(result.trim()).toStrictEqual(expectedResult.trim()); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - let buffer = ''; + test('can be started in non-interactive fashion', async () => { + const oldFds = getOpenFds(); + let buffer = ''; + const onExit = vi.fn(); - const expectedResult = '\r\n'; + const pty = new Pty({ + command: '/bin/cat', + interactive: false, + onExit, + }); - const pty = new Pty({ - command: '/bin/cat', - interactive: false, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - let result = buffer.toString(); - expect(result.trim()).toStrictEqual(expectedResult.trim()); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const readStream = pty.read; + readStream.on('data', (data) => { + buffer += data.toString(); + }); - const readStream = pty.read; + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + let result = buffer.toString(); + const expectedResult = '\r\n'; + expect(result.trim()).toStrictEqual(expectedResult.trim()); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - readStream.on('data', (data) => { - buffer += data.toString(); - }); - })); + test('can be resized', async () => { + const oldFds = getOpenFds(); + let buffer = ''; + let state: 'expectPrompt' | 'expectDone1' | 'expectDone2' | 'done' = 'expectPrompt'; + const onExit = vi.fn(); - test('can be resized', () => - new Promise((done) => { - const oldFds = getOpenFds(); - let buffer = ''; - let state: 'expectPrompt' | 'expectDone1' | 'expectDone2' | 'done' = - 'expectPrompt'; - const pty = new Pty({ - command: '/bin/sh', - size: { rows: 24, cols: 80 }, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - - expect(state).toBe('done'); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + const pty = new Pty({ + command: '/bin/sh', + size: { rows: 24, cols: 80 }, + onExit, + }); - const writeStream = pty.write; - const readStream = pty.read; + const writeStream = pty.write; + const readStream = pty.read; + const statePromise = new Promise((resolve) => { readStream.on('data', (data) => { buffer += data.toString(); @@ -200,291 +185,271 @@ describe( if (state === 'expectDone2' && buffer.includes('done2\r\n')) { expect(buffer).toContain('60 100'); state = 'done'; - writeStream.write(EOT); - return; + resolve(); } }); - })); + }); - test('respects working directory', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const cwd = process.cwd(); - let buffer = ''; + await statePromise; + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(state).toBe('done'); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - const pty = new Pty({ - command: '/bin/pwd', - dir: cwd, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.trim()).toBe(cwd); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + test('respects working directory', async () => { + const oldFds = getOpenFds(); + const cwd = process.cwd(); + let buffer = ''; + const onExit = vi.fn(); - const readStream = pty.read; - readStream.on('data', (data) => { - buffer += data.toString(); - }); - })); + const pty = new Pty({ + command: '/bin/pwd', + dir: cwd, + onExit, + }); - test('respects env', () => - new Promise((done) => { - const oldFds = getOpenFds(); - const message = 'hello from env'; - let buffer = ''; + const readStream = pty.read; + readStream.on('data', (data) => { + buffer += data.toString(); + }); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'echo $ENV_VARIABLE && exit'], - envs: { - ENV_VARIABLE: message, - }, - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.trim()).toBe(message); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.trim()).toBe(cwd); + expect(getOpenFds()).toStrictEqual(oldFds); + }); - const readStream = pty.read; - readStream.on('data', (data) => { - buffer += data.toString(); - }); - })); + test('respects env', async () => { + const oldFds = getOpenFds(); + const message = 'hello from env'; + let buffer = ''; + const onExit = vi.fn(); - test('resize after exit shouldn\'t throw', () => new Promise((done, reject) => { + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'echo $ENV_VARIABLE && exit'], + envs: { + ENV_VARIABLE: message, + }, + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffer += data.toString(); + }); + + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.trim()).toBe(message); + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + test('resize after exit shouldn\'t throw', async () => { + const onExit = vi.fn(); const pty = new Pty({ command: '/bin/echo', args: ['hello'], - onExit: (err, exitCode) => { - try { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(() => { - pty.resize({ rows: 60, cols: 100 }); - }).not.toThrow(); - done(); - } catch (e) { - reject(e) - } - }, + onExit, }); - pty.read.on('data', () => { }); - })); + pty.read.on('data', () => {}); - test('resize after close shouldn\'t throw', () => new Promise((done, reject) => { + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(() => { + pty.resize({ rows: 60, cols: 100 }); + }).not.toThrow(); + }); + + test('resize after close shouldn\'t throw', async () => { + const onExit = vi.fn(); const pty = new Pty({ command: '/bin/sh', - onExit: (err, exitCode) => { - try { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - } catch (e) { - reject(e) - } - }, + onExit, }); - pty.read.on('data', () => { }); + pty.read.on('data', () => {}); pty.close(); expect(() => { pty.resize({ rows: 60, cols: 100 }); }).not.toThrow(); - done(); - })); + + process.kill(pty.pid, 'SIGKILL'); + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, -1); + }); test( 'ordering is correct', - () => - new Promise((done) => { - const oldFds = getOpenFds(); - let buffer = Buffer.from(''); - const n = 1024; - const pty = new Pty({ - command: '/bin/sh', - args: [ - '-c', - 'seq 0 1024' - ], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer.toString().trim().split('\n').map(Number)).toStrictEqual( - Array.from({ length: n + 1 }, (_, i) => i), - ); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, - }); - - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - }); - }), - ); + async () => { + const oldFds = getOpenFds(); + let buffer = Buffer.from(''); + const n = 1024; + const onExit = vi.fn(); - test('doesnt miss large output from fast commands', - () => - new Promise((done) => { - const payload = `hello`.repeat(4096); - let buffer = Buffer.from(''); - const pty = new Pty({ - command: '/bin/echo', - args: [ - '-n', - payload - ], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - // account for the newline - expect(buffer.toString().length).toBe(payload.length); - done(); - }, - }); - - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - }); - }) - ); + const pty = new Pty({ + command: '/bin/sh', + args: [ + '-c', + `seq 0 ${n}` + ], + onExit, + }); - testSkipOnDarwin( - 'does not leak files', - () => - new Promise((done) => { - const oldFds = getOpenFds(); - const promises = []; - for (let i = 0; i < 10; i++) { - promises.push( - new Promise((accept) => { - let buffer = Buffer.from(''); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect( - buffer - .toString() - .trim() - .split(/\s+/) - .filter((fd) => { - // Some shells dup stdio to fd 255 for reasons. - return fd !== '255'; - }) - .toSorted(), - ).toStrictEqual(['0', '1', '2']); - accept(); - }, - }); - - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - }); - }), - ); - } - Promise.allSettled(promises).then(() => { - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }); - }), - ); + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + }); - test( - 'can run concurrent shells', - () => - new Promise((done) => { - const oldFds = getOpenFds(); - const donePromises: Array> = []; - const readyPromises: Array> = []; - const writeStreams: Array = []; - - // We have local echo enabled, so we'll read the message twice. - const expectedResult = 'ready\r\nhello cat\r\nhello cat\r\n'; - - for (let i = 0; i < 10; i++) { - donePromises.push( - new Promise((accept) => { - let buffer = Buffer.from(''); - const pty = new Pty({ - command: '/bin/sh', - args: ['-c', 'echo ready ; exec cat'], - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - let result = buffer.toString(); - if (IS_DARWIN) { - // Darwin adds the visible EOT to the stream. - result = result.replace('^D\b\b', ''); - } - expect(result).toStrictEqual(expectedResult); - accept(); - }, - }); - - readyPromises.push( - new Promise((ready) => { - let readyMessageReceived = false; - const readStream = pty.read; - readStream.on('data', (data) => { - buffer = Buffer.concat([buffer, data]); - if (!readyMessageReceived) { - readyMessageReceived = true; - ready(); - } - }); - }), - ); - writeStreams.push(pty.write); - }), - ); - } - Promise.allSettled(readyPromises).then(() => { - // The message should end in newline so that the EOT can signal that the input has ended and not - // just the line. - const message = 'hello cat\n'; - for (const writeStream of writeStreams) { - writeStream.write(message); - writeStream.end(EOT); - } - }); - Promise.allSettled(donePromises).then(() => { - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }); - }), + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + + const lines = buffer.toString().trim().split('\n'); + expect(lines.length).toBe(n + 1); + for (let i = 0; i < n + 1; i++) { + expect(Number(lines[i]), `expected line ${i} to contain ${i} but got ${lines[i]}`).toBe(i); + } + + expect(getOpenFds()).toStrictEqual(oldFds); + } ); - test("doesn't break when executing non-existing binary", () => - new Promise((done) => { - const oldFds = getOpenFds(); - try { - new Pty({ - command: '/bin/this-does-not-exist', - onExit: () => { - expect(getOpenFds()).toStrictEqual(oldFds); - }, - }); - } catch (e: any) { - expect(e.message).toContain('No such file or directory'); - - done(); + test('doesnt miss large output from fast commands', async () => { + const payload = `hello`.repeat(4096); + let buffer = Buffer.from(''); + const onExit = vi.fn(); + + const pty = new Pty({ + command: '/bin/echo', + args: [ + '-n', + payload + ], + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + }); + + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer.toString().length).toBe(payload.length); + }); + + testSkipOnDarwin('does not leak files', async () => { + const oldFds = getOpenFds(); + const promises = []; + + for (let i = 0; i < 10; i++) { + const onExit = vi.fn(); + let buffer = Buffer.from(''); + + const pty = new Pty({ + command: '/bin/sh', + args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'], + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffer = Buffer.concat([buffer, data]); + }); + + promises.push( + vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)).then(() => { + expect(onExit).toHaveBeenCalledWith(null, 0); + expect( + buffer + .toString() + .trim() + .split(/\s+/) + .filter((fd) => { + // Some shells dup stdio to fd 255 for reasons. + return fd !== '255'; + }) + .toSorted(), + ).toStrictEqual(['0', '1', '2']); + }) + ); + } + + await Promise.all(promises); + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + test('can run concurrent shells', async () => { + const oldFds = getOpenFds(); + const writeStreams: Array = []; + const buffers: Array = []; + const onExits: Array = []; + const expectedResult = 'hello cat\r\nhello cat\r\n'; + + // Create 10 concurrent shells + for (let i = 0; i < 10; i++) { + const onExit = vi.fn(); + onExits.push(onExit); + buffers[i] = Buffer.from(''); + + const pty = new Pty({ + command: '/bin/cat', + onExit, + }); + + const readStream = pty.read; + readStream.on('data', (data) => { + buffers[i] = Buffer.concat([buffers[i], data]); + }); + + writeStreams.push(pty.write); + pty.write.write('hello cat\n'); + } + + // Wait for initial output + await vi.waitFor(() => + buffers.every(buffer => buffer.toString().includes('hello cat\r\n')) + ); + + // Send EOT to all shells + for (const writeStream of writeStreams) { + writeStream.end(EOT); + } + + // Wait for all shells to exit + await Promise.all(onExits.map(onExit => + vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)) + )); + + // Verify results + for (let i = 0; i < 10; i++) { + expect(onExits[i]).toHaveBeenCalledWith(null, 0); + let result = buffers[i].toString(); + if (IS_DARWIN) { + result = result.replace('^D\b\b', ''); } - })); + expect(result).toStrictEqual(expectedResult); + } + + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + test("doesn't break when executing non-existing binary", async () => { + const oldFds = getOpenFds(); + + await expect(async () => { + new Pty({ + command: '/bin/this-does-not-exist', + onExit: () => {}, + }); + }).rejects.toThrow('No such file or directory'); + + expect(getOpenFds()).toStrictEqual(oldFds); + }); }, ); @@ -492,52 +457,49 @@ describe('cgroup opts', () => { beforeEach(async () => { if (!IS_DARWIN) { // create a new cgroup with the right permissions - await exec("sudo cgcreate -g 'cpu:/test.slice'") - await exec("sudo chown -R $(id -u):$(id -g) /sys/fs/cgroup/cpu/test.slice") + await exec("sudo cgcreate -g 'cpu:/test.slice'"); + await exec("sudo chown -R $(id -u):$(id -g) /sys/fs/cgroup/cpu/test.slice"); } }); afterEach(async () => { if (!IS_DARWIN) { // remove the cgroup - await exec("sudo cgdelete cpu:/test.slice") + await exec("sudo cgdelete cpu:/test.slice"); } }); - testSkipOnDarwin('basic cgroup', () => new Promise((done) => { + testSkipOnDarwin('basic cgroup', async () => { const oldFds = getOpenFds(); let buffer = ''; + const onExit = vi.fn(); + const pty = new Pty({ command: '/bin/cat', args: ['/proc/self/cgroup'], cgroupPath: '/sys/fs/cgroup/cpu/test.slice', - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - expect(buffer).toContain('/test.slice'); - expect(getOpenFds()).toStrictEqual(oldFds); - done(); - }, + onExit, }); const readStream = pty.read; readStream.on('data', (data) => { buffer = data.toString(); }); - }) - ); - testOnlyOnDarwin('cgroup is not supported on darwin', () => { + await vi.waitFor(() => expect(onExit).toHaveBeenCalledTimes(1)); + expect(onExit).toHaveBeenCalledWith(null, 0); + expect(buffer).toContain('/test.slice'); + expect(getOpenFds()).toStrictEqual(oldFds); + }); + + testOnlyOnDarwin('cgroup is not supported on darwin', async () => { expect(() => { new Pty({ command: '/bin/cat', args: ['/proc/self/cgroup'], cgroupPath: '/sys/fs/cgroup/cpu/test.slice', - onExit: (err, exitCode) => { - expect(err).toBeNull(); - expect(exitCode).toBe(0); - }, - }) + onExit: vi.fn(), + }); }).toThrowError(); }); }); diff --git a/wrapper.ts b/wrapper.ts index e47f63f..6babfc5 100644 --- a/wrapper.ts +++ b/wrapper.ts @@ -47,7 +47,7 @@ export class Pty { #fd: number; #handledClose: boolean = false; - #handledEndOfData: boolean = false; + #fdClosed: boolean = false; #socket: ReadStream; #writable: Writable; @@ -63,13 +63,14 @@ export class Pty { constructor(options: PtyOptions) { const realExit = options.onExit; - let markExited: (value: ExitResult) => void; + let markExited!: (value: ExitResult) => void; let exitResult: Promise = new Promise((resolve) => { markExited = resolve; }); - let markFdClosed: () => void; - let fdClosed = new Promise((resolve) => { - markFdClosed = resolve; + + let markReadFinished!: () => void; + let readFinished = new Promise((resolve) => { + markReadFinished = resolve; }); const mockedExit = (error: NodeJS.ErrnoException | null, code: number) => { markExited({ error, code }); @@ -89,23 +90,21 @@ export class Pty { }); // catch end events - const handleEnd = async () => { - if (this.#handledEndOfData) { + const handleClose = async () => { + if (this.#fdClosed) { return; } - this.#handledEndOfData = true; + this.#fdClosed = true; // must wait for fd close and exit result before calling real exit - await fdClosed; + await readFinished; const result = await exitResult; realExit(result.error, result.code); }; - this.read.on('end', handleEnd); - this.read.on('close', () => { - markFdClosed(); - }); + this.read.once('end', markReadFinished); + this.read.once('close', handleClose); // PTYs signal their done-ness with an EIO error. we therefore need to filter them out (as well as // cleaning up other spurious errors) so that the user doesn't need to handle them and be in @@ -123,7 +122,9 @@ export class Pty { // is nothing left to read and we can start tearing things down. If we hadn't received an // error so far, we are considered to be in good standing. this.read.off('error', handleError); - handleEnd(); + // emit 'end' to signal no more data + // this will trigger our 'end' handler which marks readFinished + this.read.emit('end'); return; } } @@ -144,7 +145,7 @@ export class Pty { } resize(size: Size) { - if (this.#handledClose || this.#handledEndOfData) { + if (this.#handledClose || this.#fdClosed) { return; }