Skip to content

Commit

Permalink
3.1.2 Set the PTY fd to be non-blocking (#36)
Browse files Browse the repository at this point in the history
We have gone back and forth a couple of times, but we _do_ need to set
this FD to be non-blocking. Otherwise node will fully consume one I/O
thread for each PTY, and node only has four of those.

This change makes the controller fd (the one the parent process reads
from / writes to) as non-blocking for node.
  • Loading branch information
lhchavez authored Jul 15, 2024
1 parent 6acb8f2 commit f77eab0
Show file tree
Hide file tree
Showing 7 changed files with 189 additions and 116 deletions.
22 changes: 7 additions & 15 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ export interface Size {
cols: number
rows: number
}
/** Resize the terminal. */
export function ptyResize(fd: number, size: Size): void
/**
* Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under
* the covers.
Expand All @@ -27,24 +29,14 @@ export function setCloseOnExec(fd: number, closeOnExec: boolean): void
*_CLOEXEC` under the covers.
*/
export function getCloseOnExec(fd: number): boolean
export declare class Pty {
export class Pty {
/** The pid of the forked process. */
pid: number
constructor(opts: PtyOptions)
/** Resize the terminal. */
resize(size: Size): void
/**
* Returns a file descriptor for the PTY controller.
* See the docstring of the class for an usage example.
* Transfers ownership of the file descriptor for the PTY controller. This can only be called
* once (it will error the second time). The caller is responsible for closing the file
* descriptor.
*/
fd(): c_int
/**
* Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
* been closed, otherwise we will leak file descriptors!
*
* In an ideal world, this would be automatically called after the wait loop is done, but Node
* doesn't like that one bit, since it implies that the file is closed outside of the main
* event loop.
*/
close(): void
takeFd(): c_int
}
3 changes: 2 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,9 @@ if (!nativeBinding) {
throw new Error(`Failed to load native binding`)
}

const { Pty, setCloseOnExec, getCloseOnExec } = nativeBinding
const { Pty, ptyResize, setCloseOnExec, getCloseOnExec } = nativeBinding

module.exports.Pty = Pty
module.exports.ptyResize = ptyResize
module.exports.setCloseOnExec = setCloseOnExec
module.exports.getCloseOnExec = getCloseOnExec
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty",
"version": "3.1.1",
"version": "3.1.2",
"main": "dist/wrapper.js",
"types": "dist/wrapper.d.ts",
"author": "Szymon Kaliski <[email protected]>",
Expand Down
112 changes: 58 additions & 54 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::io::Error;
use std::io::ErrorKind;
use std::os::fd::{AsRawFd, OwnedFd};
use std::os::fd::{BorrowedFd, FromRawFd, RawFd};
use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
use std::thread;
Expand All @@ -16,7 +16,7 @@ use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFun
use napi::Status::GenericFailure;
use napi::{self, Env};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, FdFlag};
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
use nix::pty::{openpty, Winsize};
use nix::sys::termios::{self, SetArg};
Expand Down Expand Up @@ -76,7 +76,8 @@ fn poll_controller_fd_until_read(raw_fd: RawFd) {
continue;
}

// we should never hit this, but if we do, we should just break out of the loop
// we should almost never hit this, but if we do, we should just break out of the loop. this
// can happen if Node destroys the terminal before waiting for the child process to go away.
break;
}

Expand Down Expand Up @@ -117,12 +118,14 @@ impl Pty {
}

// open pty pair, and set close-on-exec to avoid unwanted copies of the FDs from finding their
// way into subprocesses.
// way into subprocesses. Also set the nonblocking flag to avoid Node from consuming a full I/O
// thread for this.
let pty_res = openpty(&window_size, None).map_err(cast_to_napi_error)?;
let controller_fd = pty_res.master;
let user_fd = pty_res.slave;
set_close_on_exec(controller_fd.as_raw_fd(), true)?;
set_close_on_exec(user_fd.as_raw_fd(), true)?;
set_nonblocking(controller_fd.as_raw_fd())?;

// duplicate pty user_fd to be the child's stdin, stdout, and stderr
cmd.stdin(Stdio::from(user_fd.try_clone()?));
Expand Down Expand Up @@ -254,68 +257,43 @@ impl Pty {
})
}

/// Resize the terminal.
/// Transfers ownership of the file descriptor for the PTY controller. This can only be called
/// once (it will error the second time). The caller is responsible for closing the file
/// descriptor.
#[napi]
#[allow(dead_code)]
pub fn resize(&mut self, size: Size) -> Result<(), napi::Error> {
let window_size = Winsize {
ws_col: size.cols,
ws_row: size.rows,
ws_xpixel: 0,
ws_ypixel: 0,
};

if let Some(fd) = &self.controller_fd {
let res = unsafe { libc::ioctl(fd.as_raw_fd(), libc::TIOCSWINSZ, &window_size as *const _) };
if res == -1 {
return Err(napi::Error::new(
napi::Status::GenericFailure,
format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()),
));
}

Ok(())
} else {
Err(napi::Error::new(
napi::Status::GenericFailure,
"ioctl TIOCSWINSZ failed: bad file descriptor (os error 9)",
))
}
}

/// Returns a file descriptor for the PTY controller.
/// See the docstring of the class for an usage example.
#[napi]
#[allow(dead_code)]
pub fn fd(&mut self) -> Result<c_int, napi::Error> {
if let Some(fd) = &self.controller_fd {
Ok(fd.as_raw_fd())
pub fn take_fd(&mut self) -> Result<c_int, napi::Error> {
if let Some(fd) = self.controller_fd.take() {
Ok(fd.into_raw_fd())
} else {
Err(napi::Error::new(
napi::Status::GenericFailure,
"fd failed: bad file descriptor (os error 9)",
))
}
}
}

/// Close the PTY file descriptor. This must be called when the readers / writers of the PTY have
/// been closed, otherwise we will leak file descriptors!
///
/// In an ideal world, this would be automatically called after the wait loop is done, but Node
/// doesn't like that one bit, since it implies that the file is closed outside of the main
/// event loop.
#[napi]
#[allow(dead_code)]
pub fn close(&mut self) -> Result<(), napi::Error> {
if let Some(fd) = self.controller_fd.take() {
unsafe {
// ok to best-effort close as node can also close this via autoClose
libc::close(fd.as_raw_fd());
};
}
/// Resize the terminal.
#[napi]
#[allow(dead_code)]
fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> {
let window_size = Winsize {
ws_col: size.cols,
ws_row: size.rows,
ws_xpixel: 0,
ws_ypixel: 0,
};

Ok(())
let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) };
if res == -1 {
return Err(napi::Error::new(
napi::Status::GenericFailure,
format!("ioctl TIOCSWINSZ failed: {}", Error::last_os_error()),
));
}

Ok(())
}

/// Set the close-on-exec flag on a file descriptor. This is `fcntl(fd, F_SETFD, FD_CLOEXEC)` under
Expand Down Expand Up @@ -362,3 +340,29 @@ fn get_close_on_exec(fd: i32) -> Result<bool, napi::Error> {
)),
}
}

/// Set the file descriptor to be non-blocking.
#[allow(dead_code)]
fn set_nonblocking(fd: i32) -> Result<(), napi::Error> {
let old_flags = match fcntl(fd, FcntlArg::F_GETFL) {
Ok(flags) => OFlag::from_bits_truncate(flags),
Err(err) => {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_GETFL: {}", err),
));
}
};

let mut new_flags = old_flags;
new_flags.set(OFlag::O_NONBLOCK, true);
if old_flags != new_flags {
if let Err(err) = fcntl(fd, FcntlArg::F_SETFL(new_flags)) {
return Err(napi::Error::new(
GenericFailure,
format!("fcntl F_SETFL: {}", err),
));
}
}
Ok(())
}
88 changes: 78 additions & 10 deletions tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Pty, getCloseOnExec, setCloseOnExec } from '../wrapper';
import { type Writable } from 'stream';
import { readdirSync, readlinkSync } from 'fs';
import { describe, test, expect } from 'vitest';

Expand All @@ -19,10 +20,11 @@ function getOpenFds(): FdRecord {
try {
const linkTarget = readlinkSync(procSelfFd + filename);
if (
linkTarget === 'anon_inode:[timerfd]' ||
linkTarget.startsWith('anon_inode:[') ||
linkTarget.startsWith('socket:[') ||
// node likes to asynchronously read stuff mid-test.
linkTarget.includes('/rustpy/')
linkTarget.includes('/ruspty/') ||
linkTarget === '/dev/null'
) {
continue;
}
Expand Down Expand Up @@ -91,16 +93,19 @@ describe(
let buffer = '';

// We have local echo enabled, so we'll read the message twice.
const result = IS_DARWIN
? 'hello cat\r\n^D\b\bhello cat\r\n'
: 'hello cat\r\nhello cat\r\n';
const expectedResult = 'hello cat\r\nhello cat\r\n';

const pty = new Pty({
command: '/bin/cat',
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
expect(buffer.trim()).toBe(result.trim());
let result = buffer.toString();
if (IS_DARWIN) {
// Darwin adds the visible EOT to the stream.
result = result.replace('^D\b\b', '');
}
expect(result.trim()).toStrictEqual(expectedResult.trim());
expect(getOpenFds()).toStrictEqual(oldFds);
done();
},
Expand Down Expand Up @@ -261,10 +266,7 @@ describe(
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: [
'-c',
'sleep 0.1 ; ls /proc/$$/fd',
],
args: ['-c', 'sleep 0.1 ; ls /proc/$$/fd'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
Expand Down Expand Up @@ -298,6 +300,72 @@ describe(
{ repeats: 4 },
);

test(
'can run concurrent shells',
() =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
const donePromises: Array<Promise<void>> = [];
const readyPromises: Array<Promise<void>> = [];
const writeStreams: Array<Writable> = [];

// We have local echo enabled, so we'll read the message twice.
const expectedResult = 'ready\r\nhello cat\r\nhello cat\r\n';

for (let i = 0; i < 10; i++) {
donePromises.push(
new Promise<void>((accept) => {
let buffer = Buffer.from('');
const pty = new Pty({
command: '/bin/sh',
args: ['-c', 'echo ready ; exec cat'],
onExit: (err, exitCode) => {
expect(err).toBeNull();
expect(exitCode).toBe(0);
let result = buffer.toString();
if (IS_DARWIN) {
// Darwin adds the visible EOT to the stream.
result = result.replace('^D\b\b', '');
}
expect(result).toStrictEqual(expectedResult);
accept();
},
});

readyPromises.push(
new Promise<void>((ready) => {
let readyMessageReceived = false;
const readStream = pty.read;
readStream.on('data', (data) => {
buffer = Buffer.concat([buffer, data]);
if (!readyMessageReceived) {
readyMessageReceived = true;
ready();
}
});
}),
);
writeStreams.push(pty.write);
}),
);
}
Promise.allSettled(readyPromises).then(() => {
// The message should end in newline so that the EOT can signal that the input has ended and not
// just the line.
const message = 'hello cat\n';
for (const writeStream of writeStreams) {
writeStream.write(message);
writeStream.end(EOT);
}
});
Promise.allSettled(donePromises).then(() => {
expect(getOpenFds()).toStrictEqual(oldFds);
done();
});
}),
{ repeats: 4 },
);

test("doesn't break when executing non-existing binary", () =>
new Promise<void>((done) => {
const oldFds = getOpenFds();
Expand Down
Loading

0 comments on commit f77eab0

Please sign in to comment.