Skip to content

Commit

Permalink
Avoid the Bun 1.1.7 bug
Browse files Browse the repository at this point in the history
Bun currently has a bug where raw FDs don't interact well with
`fs.createReadStream`, and the result is that not all data is seen by
the `'data'` callback: oven-sh/bun#9907

This change is an egregious workaround that makes Rust read the file
until the process closes it. It is suboptimal because of all the data
copying that happens, and crossing the FFI barrier also takes some time,
but at least this should let us not be completely blocked. Now there is
one more (optional) argument to `Pty` that is a replacement of the
`'data'` callback.
  • Loading branch information
lhchavez committed May 5, 2024
1 parent dcf208b commit 3baa570
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 21 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ crate-type = ["cdylib"]
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
napi = { version = "2.12.2", default-features = false, features = ["napi4"] }
napi-derive = "2.12.2"
rustix = "0.38.30"
rustix = { version = "0.38.30", features = ["event"] }
rustix-openpty = "0.1.1"
libc = "0.2.152"

Expand Down
2 changes: 1 addition & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export interface Size {
export class Pty {
/** The pid of the forked process. */
pid: number
constructor(command: string, args: Array<string>, envs: Record<string, string>, dir: string, size: Size, onExit: (err: null | Error, exitCode: number) => void)
constructor(command: string, args: Array<string>, envs: Record<string, string>, dir: string, size: Size, onExit: (err: null | Error, exitCode: number) => void, onData?: (err: null | Error, data: Buffer) => void)
/** Resize the terminal. */
resize(size: Size): void
/**
Expand Down
26 changes: 26 additions & 0 deletions index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,32 @@ describe('PTY', () => {
Bun.write(pty.fd(), message + EOT + EOT);
});

test('works with data callback', (done) => {
const message = 'hello bun\n';
let buffer = Buffer.from('');

const pty = new Pty(
'/bin/cat',
[],
{},
CWD,
{ rows: 24, cols: 80 },
() => {
expect(Buffer.from(buffer).toString()).toBe(
'hello bun\r\nhello bun\r\n',
);
pty.close();
done();
},
(err: Error | null, chunk: Buffer) => {
expect(err).toBeNull();
buffer = Buffer.concat([buffer, chunk]);
},
);

Bun.write(pty.fd(), message + EOT + EOT);
});

test("doesn't break when executing non-existing binary", (done) => {
try {
new Pty(
Expand Down
113 changes: 94 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use std::process::{Command, Stdio};
use std::thread;

use libc::{self, c_int};
use napi::bindgen_prelude::JsFunction;
use napi::bindgen_prelude::{Buffer, JsFunction};
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi::Status::GenericFailure;
use napi::{self, Env};
use rustix::event::epoll;
use rustix_openpty::openpty;
use rustix_openpty::rustix::termios::{self, InputModes, OptionalActions, Winsize};

Expand Down Expand Up @@ -63,6 +64,9 @@ extern crate napi_derive;
/// // TODO: Handle the error.
/// });
/// ```
///
/// The last parameter (a callback that gets stdin chunks) is optional and is only there for
/// compatibility with bun 1.1.7.
#[napi]
#[allow(dead_code)]
struct Pty {
Expand Down Expand Up @@ -130,6 +134,7 @@ impl Pty {
dir: String,
size: Size,
#[napi(ts_arg_type = "(err: null | Error, exitCode: number) => void")] on_exit: JsFunction,
#[napi(ts_arg_type = "(err: null | Error, data: Buffer) => void")] on_data: Option<JsFunction>,
) -> Result<Self, napi::Error> {
let is_node = env.get_node_version()?.release == "node";
let window_size = Winsize {
Expand Down Expand Up @@ -215,29 +220,99 @@ impl Pty {
// For discussion check out: https://github.com/replit/ruspty/pull/1#discussion_r1463672548
let ts_on_exit: ThreadsafeFunction<i32, ErrorStrategy::CalleeHandled> = on_exit
.create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?;
thread::spawn(move || match child.wait() {
Ok(status) => {
if status.success() {
ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking);
} else {
let ts_on_data = on_data
.map(|on_data| {
Ok::<
(
ThreadsafeFunction<Buffer, ErrorStrategy::CalleeHandled>,
OwnedFd,
),
napi::Error,
>((
on_data.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?,
match controller_fd.try_clone() {
Ok(fd) => Ok(fd),
Err(err) => Err(napi::Error::new(
GenericFailure,
format!(
"OS error when waiting for child process to exit: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
}?,
))
})
.transpose()?;
thread::spawn(move || {
if let Some((ts_on_data, controller_fd)) = ts_on_data {
if let Err(err) = || -> Result<(), napi::Error> {
let epoll = epoll::create(epoll::CreateFlags::CLOEXEC).map_err(|err| {
napi::Error::new(GenericFailure, format!("epoll::create: {:#?}", err))
})?;
epoll::add(
&epoll,
&controller_fd,
epoll::EventData::new_u64(1),
epoll::EventFlags::IN,
)
.map_err(|err| napi::Error::new(GenericFailure, format!("epoll::add: {:#?}", err)))?;
let mut buf = [0u8; 16 * 1024];
let mut event_list = epoll::EventVec::with_capacity(4);
loop {
epoll::wait(&epoll, &mut event_list, -1).map_err(|err| {
napi::Error::new(GenericFailure, format!("epoll::wair: {:#?}", err))
})?;
match rustix::io::read(&controller_fd, &mut buf) {
Ok(n) => {
ts_on_data.call(
Ok(buf[..n as usize].into()),
ThreadsafeFunctionCallMode::Blocking,
);
}
Err(errno) => {
if errno == rustix::io::Errno::AGAIN || errno == rustix::io::Errno::INTR {
// These two errors are safe to retry.
continue;
}
if errno == rustix::io::Errno::IO {
// This error happens when the child closes. We can simply break the loop.
return Ok(());
}
return Err(napi::Error::new(
GenericFailure,
format!("OS error when reading from child: {:#?}", errno,),
));
}
}
}
}() {
ts_on_data.call(Err(err), ThreadsafeFunctionCallMode::Blocking);
}
}
match child.wait() {
Ok(status) => {
if status.success() {
ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking);
} else {
ts_on_exit.call(
Ok(status.code().unwrap_or(-1)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
Err(err) => {
ts_on_exit.call(
Ok(status.code().unwrap_or(-1)),
Err(napi::Error::new(
GenericFailure,
format!(
"OS error when waiting for child process to exit: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
Err(err) => {
ts_on_exit.call(
Err(napi::Error::new(
GenericFailure,
format!(
"OS error when waiting for child process to exit: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
ThreadsafeFunctionCallMode::Blocking,
);
}
});

Ok(Pty {
Expand Down

0 comments on commit 3baa570

Please sign in to comment.