Skip to content

Commit

Permalink
Make it safe to use FDs once more
Browse files Browse the repository at this point in the history
We're currently using nodejs with the `onData` callback. But we know
that's slow because it crosses the FFI barrier.

This change now makes it safe to use the file descriptor.
  • Loading branch information
lhchavez committed May 16, 2024
1 parent 4116c43 commit 4ab9d9a
Show file tree
Hide file tree
Showing 2 changed files with 163 additions and 122 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ version = "1.0.0"
crate-type = ["cdylib"]

[dependencies]
backoff = "0.4.0"
libc = "0.2.152"
# Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix
napi = { version = "2.12.2", default-features = false, features = ["napi4"] }
napi-derive = "2.12.2"
rustix = { version = "0.38.30", features = ["event"] }
rustix-openpty = "0.1.1"
libc = "0.2.152"

[build-dependencies]
napi-build = "2.0.1"
Expand Down
282 changes: 161 additions & 121 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::os::fd::{AsRawFd, OwnedFd};
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use libc::{self, c_int};
use napi::bindgen_prelude::{Buffer, JsFunction};
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
Expand Down Expand Up @@ -76,7 +79,6 @@ extern crate napi_derive;
#[allow(dead_code)]
struct Pty {
controller_fd: Option<OwnedFd>,
user_fd: Option<OwnedFd>,
should_dup_fds: bool,
/// The pid of the forked process.
pub pid: u32,
Expand Down Expand Up @@ -236,142 +238,182 @@ impl Pty {
// they are ready to be `wait`'ed. This has the inconvenience that it consumes one FD per child.
//
// For discussion check out: https://github.com/replit/ruspty/pull/1#discussion_r1463672548
let ts_on_exit: ThreadsafeFunction<i32, ErrorStrategy::CalleeHandled> = opts
.on_exit
.create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?;
let ts_on_data = opts
.on_data
.map(|on_data| {
Ok::<
(
ThreadsafeFunction<Buffer, ErrorStrategy::CalleeHandled>,
OwnedFd,
),
napi::Error,
>((
on_data.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?,
match controller_fd.try_clone() {
Ok(fd) => Ok(fd),
Err(err) => Err(napi::Error::new(
GenericFailure,
format!(
"OS error when setting up child process wait: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
}?,
))
})
.transpose()?;
thread::spawn(move || {
#[cfg(target_os = "linux")]
{
// The following code only works on Linux due to the reliance on pidfd.
use rustix::process::{pidfd_open, Pid, PidfdFlags};

if let Some((ts_on_data, controller_fd)) = ts_on_data {
if let Err(err) = || -> Result<(), napi::Error> {
let pidfd = pidfd_open(
unsafe { Pid::from_raw_unchecked(child.id() as i32) },
PidfdFlags::empty(),
)
.map_err(|err| napi::Error::new(GenericFailure, format!("pidfd_open: {:#?}", err)))?;
let mut poll_fds = [
PollFd::new(&controller_fd, PollFlags::IN),
PollFd::new(&pidfd, PollFlags::IN),
];
let mut buf = [0u8; 16 * 1024];
loop {
for poll_fd in &mut poll_fds[..] {
poll_fd.clear_revents();
}
poll(&mut poll_fds, -1).map_err(|err| {
napi::Error::new(
GenericFailure,
format!("OS error when waiting for child read: {:#?}", err),
)
})?;
// Always check the controller FD first to see if it has any events.
if poll_fds[0].revents().contains(PollFlags::IN) {
match rustix::io::read(&controller_fd, &mut buf) {
Ok(n) => {
ts_on_data.call(
Ok(buf[..n as usize].into()),
ThreadsafeFunctionCallMode::Blocking,
);
}
Err(errno) => {
if errno == rustix::io::Errno::AGAIN || errno == rustix::io::Errno::INTR {
// These two errors are safe to retry.
continue;
{
let controller_fd = match controller_fd.try_clone() {
Ok(fd) => fd,
Err(err) => {
return Err(napi::Error::new(
GenericFailure,
format!(
"OS error when setting up child process wait: {}",
err.raw_os_error().unwrap_or(-1)
),
))
}
};
let ts_on_exit: ThreadsafeFunction<i32, ErrorStrategy::CalleeHandled> = opts
.on_exit
.create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?;
let ts_on_data: Option<ThreadsafeFunction<Buffer, ErrorStrategy::CalleeHandled>> = opts
.on_data
.map(|on_data| {
Ok::<ThreadsafeFunction<Buffer, ErrorStrategy::CalleeHandled>, napi::Error>(
on_data.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?,
)
})
.transpose()?;
thread::spawn(move || {
let controller_fd = if let Some(ts_on_data) = ts_on_data {
// If the on_data callback was passed, we'll consume the controller_fd as part of the
// poll loop.
#[cfg(target_os = "linux")]
{
// The following code only works on Linux due to the reliance on pidfd.
use rustix::process::{pidfd_open, Pid, PidfdFlags};

if let Err(err) = || -> Result<(), napi::Error> {
let pidfd = pidfd_open(
unsafe { Pid::from_raw_unchecked(child.id() as i32) },
PidfdFlags::empty(),
)
.map_err(|err| napi::Error::new(GenericFailure, format!("pidfd_open: {:#?}", err)))?;
let mut poll_fds = [
PollFd::new(&controller_fd, PollFlags::IN),
PollFd::new(&pidfd, PollFlags::IN),
];
let mut buf = [0u8; 16 * 1024];
loop {
for poll_fd in &mut poll_fds[..] {
poll_fd.clear_revents();
}
poll(&mut poll_fds, -1).map_err(|err| {
napi::Error::new(
GenericFailure,
format!("OS error when waiting for child read: {:#?}", err),
)
})?;
// Always check the controller FD first to see if it has any events.
if poll_fds[0].revents().contains(PollFlags::IN) {
match rustix::io::read(&controller_fd, &mut buf) {
Ok(n) => {
ts_on_data.call(
Ok(buf[..n as usize].into()),
ThreadsafeFunctionCallMode::Blocking,
);
}
if errno == rustix::io::Errno::IO {
// This error happens when the child closes. We can simply break the loop.
return Ok(());
Err(errno) => {
if errno == rustix::io::Errno::AGAIN || errno == rustix::io::Errno::INTR {
// These two errors are safe to retry.
continue;
}
if errno == rustix::io::Errno::IO {
// This error happens when the child closes. We can simply break the loop.
return Ok(());
}
return Err(napi::Error::new(
GenericFailure,
format!("OS error when reading from child: {:#?}", errno,),
));
}
return Err(napi::Error::new(
GenericFailure,
format!("OS error when reading from child: {:#?}", errno,),
));
}
// If there was data, keep trying to read this FD.
continue;
}

// Now that we're sure that the controller FD doesn't have any events, we have
// successfully drained the child's output, so we can now check if the child has
// exited.
if poll_fds[1].revents().contains(PollFlags::IN) {
return Ok(());
}
// If there was data, keep trying to read this FD.
continue;
}
}() {
ts_on_data.call(Err(err), ThreadsafeFunctionCallMode::Blocking);
}
}
#[cfg(not(target_os = "linux"))]
{
ts_on_data.call(
Err(napi::Error::new(
GenericFailure,
"the data callback is only implemented in Linux",
)),
ThreadsafeFunctionCallMode::Blocking,
);
}

// Now that we're sure that the controller FD doesn't have any events, we have
// successfully drained the child's output, so we can now check if the child has
// exited.
if poll_fds[1].revents().contains(PollFlags::IN) {
return Ok(());
drop(controller_fd);
None
} else {
// If `on_data` was not passed, we'll wait for the process to go away and then keep
// polling the FD until all data has been read.
Some(controller_fd)
};
let wait_result = child.wait();
if let Some(controller_fd) = controller_fd {
// If `on_data` was not passed, we still have one end of the PTY. We'll keep polling
// the FD (and sleeping a bit) until all data has been read. That will be done in a
// best-effort fashion.
let mut poll_fds = [PollFd::new(&controller_fd, PollFlags::IN)];
let mut backoff = ExponentialBackoffBuilder::new()
.with_initial_interval(Duration::from_millis(1))
.with_max_interval(Duration::from_secs(1))
.build();
loop {
for poll_fd in &mut poll_fds[..] {
poll_fd.clear_revents();
}
if poll(&mut poll_fds, 0).is_err() {
// We tried!
break;
}
if poll_fds[0].revents().contains(PollFlags::IN) {
// There's still data to be read. We'll sit tight for a few milliseconds.
// If there was data, keep trying to read this FD.
if let Some(d) = backoff.next_backoff() {
thread::sleep(d);
continue;
} else {
break;
}
}
}() {
ts_on_data.call(Err(err), ThreadsafeFunctionCallMode::Blocking);

// Now that we're sure that the controller FD doesn't have any events, we have
// successfully drained the child's output, so we can now invoke the callback.
break;
}
}
}
#[cfg(not(target_os = "linux"))]
{
if let Some((ts_on_data, _controller_fd)) = ts_on_data {
ts_on_data.call(
Err(napi::Error::new(
GenericFailure,
"the data callback is only implemented in Linux",
)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
match child.wait() {
Ok(status) => {
if status.success() {
ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking);
} else {
// Now we drop the user_fd. We have fully read from it.
drop(user_fd);
match wait_result {
Ok(status) => {
if status.success() {
ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking);
} else {
ts_on_exit.call(
Ok(status.code().unwrap_or(-1)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
Err(err) => {
ts_on_exit.call(
Ok(status.code().unwrap_or(-1)),
Err(napi::Error::new(
GenericFailure,
format!(
"OS error when waiting for child process to exit: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
Err(err) => {
ts_on_exit.call(
Err(napi::Error::new(
GenericFailure,
format!(
"OS error when waiting for child process to exit: {}",
err.raw_os_error().unwrap_or(-1)
),
)),
ThreadsafeFunctionCallMode::Blocking,
);
}
}
});
});
}

Ok(Pty {
controller_fd: Some(controller_fd),
user_fd: Some(user_fd),
should_dup_fds,
pid,
})
Expand Down Expand Up @@ -446,14 +488,12 @@ impl Pty {
#[allow(dead_code)]
pub fn close(&mut self) -> Result<(), napi::Error> {
let controller_fd = self.controller_fd.take();
let user_fd = self.user_fd.take();
if controller_fd.is_none() {
return Err(napi::Error::new(
napi::Status::GenericFailure,
format!("close failed: {}", libc::EBADF),
));
}
drop(user_fd);

Ok(())
}
Expand Down

0 comments on commit 4ab9d9a

Please sign in to comment.