From 4ab9d9ae8a1c6a0eb1057c6b36e124514dc8a66e Mon Sep 17 00:00:00 2001 From: lhchavez Date: Thu, 16 May 2024 13:50:48 +0000 Subject: [PATCH] Make it safe to use FDs once more We're currently using nodejs with the `onData` callback. But we know that's slow because it crosses the FFI barrier. This change now makes it safe to use the file descriptor. --- Cargo.toml | 3 +- src/lib.rs | 282 ++++++++++++++++++++++++++++++----------------------- 2 files changed, 163 insertions(+), 122 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e17c816..a7b00d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,12 +7,13 @@ version = "1.0.0" crate-type = ["cdylib"] [dependencies] +backoff = "0.4.0" +libc = "0.2.152" # Default enable napi4 feature, see https://nodejs.org/api/n-api.html#node-api-version-matrix napi = { version = "2.12.2", default-features = false, features = ["napi4"] } napi-derive = "2.12.2" rustix = { version = "0.38.30", features = ["event"] } rustix-openpty = "0.1.1" -libc = "0.2.152" [build-dependencies] napi-build = "2.0.1" diff --git a/src/lib.rs b/src/lib.rs index 8510abb..e858c77 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,10 @@ use std::os::fd::{AsRawFd, OwnedFd}; use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::thread; +use std::time::Duration; +use backoff::backoff::Backoff; +use backoff::ExponentialBackoffBuilder; use libc::{self, c_int}; use napi::bindgen_prelude::{Buffer, JsFunction}; use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode}; @@ -76,7 +79,6 @@ extern crate napi_derive; #[allow(dead_code)] struct Pty { controller_fd: Option, - user_fd: Option, should_dup_fds: bool, /// The pid of the forked process. pub pid: u32, @@ -236,142 +238,182 @@ impl Pty { // they are ready to be `wait`'ed. This has the inconvenience that it consumes one FD per child. // // For discussion check out: https://github.com/replit/ruspty/pull/1#discussion_r1463672548 - let ts_on_exit: ThreadsafeFunction = opts - .on_exit - .create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?; - let ts_on_data = opts - .on_data - .map(|on_data| { - Ok::< - ( - ThreadsafeFunction, - OwnedFd, - ), - napi::Error, - >(( - on_data.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?, - match controller_fd.try_clone() { - Ok(fd) => Ok(fd), - Err(err) => Err(napi::Error::new( - GenericFailure, - format!( - "OS error when setting up child process wait: {}", - err.raw_os_error().unwrap_or(-1) - ), - )), - }?, - )) - }) - .transpose()?; - thread::spawn(move || { - #[cfg(target_os = "linux")] - { - // The following code only works on Linux due to the reliance on pidfd. - use rustix::process::{pidfd_open, Pid, PidfdFlags}; - - if let Some((ts_on_data, controller_fd)) = ts_on_data { - if let Err(err) = || -> Result<(), napi::Error> { - let pidfd = pidfd_open( - unsafe { Pid::from_raw_unchecked(child.id() as i32) }, - PidfdFlags::empty(), - ) - .map_err(|err| napi::Error::new(GenericFailure, format!("pidfd_open: {:#?}", err)))?; - let mut poll_fds = [ - PollFd::new(&controller_fd, PollFlags::IN), - PollFd::new(&pidfd, PollFlags::IN), - ]; - let mut buf = [0u8; 16 * 1024]; - loop { - for poll_fd in &mut poll_fds[..] { - poll_fd.clear_revents(); - } - poll(&mut poll_fds, -1).map_err(|err| { - napi::Error::new( - GenericFailure, - format!("OS error when waiting for child read: {:#?}", err), - ) - })?; - // Always check the controller FD first to see if it has any events. - if poll_fds[0].revents().contains(PollFlags::IN) { - match rustix::io::read(&controller_fd, &mut buf) { - Ok(n) => { - ts_on_data.call( - Ok(buf[..n as usize].into()), - ThreadsafeFunctionCallMode::Blocking, - ); - } - Err(errno) => { - if errno == rustix::io::Errno::AGAIN || errno == rustix::io::Errno::INTR { - // These two errors are safe to retry. - continue; + { + let controller_fd = match controller_fd.try_clone() { + Ok(fd) => fd, + Err(err) => { + return Err(napi::Error::new( + GenericFailure, + format!( + "OS error when setting up child process wait: {}", + err.raw_os_error().unwrap_or(-1) + ), + )) + } + }; + let ts_on_exit: ThreadsafeFunction = opts + .on_exit + .create_threadsafe_function(0, |ctx| ctx.env.create_int32(ctx.value).map(|v| vec![v]))?; + let ts_on_data: Option> = opts + .on_data + .map(|on_data| { + Ok::, napi::Error>( + on_data.create_threadsafe_function(0, |ctx| Ok(vec![ctx.value]))?, + ) + }) + .transpose()?; + thread::spawn(move || { + let controller_fd = if let Some(ts_on_data) = ts_on_data { + // If the on_data callback was passed, we'll consume the controller_fd as part of the + // poll loop. + #[cfg(target_os = "linux")] + { + // The following code only works on Linux due to the reliance on pidfd. + use rustix::process::{pidfd_open, Pid, PidfdFlags}; + + if let Err(err) = || -> Result<(), napi::Error> { + let pidfd = pidfd_open( + unsafe { Pid::from_raw_unchecked(child.id() as i32) }, + PidfdFlags::empty(), + ) + .map_err(|err| napi::Error::new(GenericFailure, format!("pidfd_open: {:#?}", err)))?; + let mut poll_fds = [ + PollFd::new(&controller_fd, PollFlags::IN), + PollFd::new(&pidfd, PollFlags::IN), + ]; + let mut buf = [0u8; 16 * 1024]; + loop { + for poll_fd in &mut poll_fds[..] { + poll_fd.clear_revents(); + } + poll(&mut poll_fds, -1).map_err(|err| { + napi::Error::new( + GenericFailure, + format!("OS error when waiting for child read: {:#?}", err), + ) + })?; + // Always check the controller FD first to see if it has any events. + if poll_fds[0].revents().contains(PollFlags::IN) { + match rustix::io::read(&controller_fd, &mut buf) { + Ok(n) => { + ts_on_data.call( + Ok(buf[..n as usize].into()), + ThreadsafeFunctionCallMode::Blocking, + ); } - if errno == rustix::io::Errno::IO { - // This error happens when the child closes. We can simply break the loop. - return Ok(()); + Err(errno) => { + if errno == rustix::io::Errno::AGAIN || errno == rustix::io::Errno::INTR { + // These two errors are safe to retry. + continue; + } + if errno == rustix::io::Errno::IO { + // This error happens when the child closes. We can simply break the loop. + return Ok(()); + } + return Err(napi::Error::new( + GenericFailure, + format!("OS error when reading from child: {:#?}", errno,), + )); } - return Err(napi::Error::new( - GenericFailure, - format!("OS error when reading from child: {:#?}", errno,), - )); } + // If there was data, keep trying to read this FD. + continue; + } + + // Now that we're sure that the controller FD doesn't have any events, we have + // successfully drained the child's output, so we can now check if the child has + // exited. + if poll_fds[1].revents().contains(PollFlags::IN) { + return Ok(()); } - // If there was data, keep trying to read this FD. - continue; } + }() { + ts_on_data.call(Err(err), ThreadsafeFunctionCallMode::Blocking); + } + } + #[cfg(not(target_os = "linux"))] + { + ts_on_data.call( + Err(napi::Error::new( + GenericFailure, + "the data callback is only implemented in Linux", + )), + ThreadsafeFunctionCallMode::Blocking, + ); + } - // Now that we're sure that the controller FD doesn't have any events, we have - // successfully drained the child's output, so we can now check if the child has - // exited. - if poll_fds[1].revents().contains(PollFlags::IN) { - return Ok(()); + drop(controller_fd); + None + } else { + // If `on_data` was not passed, we'll wait for the process to go away and then keep + // polling the FD until all data has been read. + Some(controller_fd) + }; + let wait_result = child.wait(); + if let Some(controller_fd) = controller_fd { + // If `on_data` was not passed, we still have one end of the PTY. We'll keep polling + // the FD (and sleeping a bit) until all data has been read. That will be done in a + // best-effort fashion. + let mut poll_fds = [PollFd::new(&controller_fd, PollFlags::IN)]; + let mut backoff = ExponentialBackoffBuilder::new() + .with_initial_interval(Duration::from_millis(1)) + .with_max_interval(Duration::from_secs(1)) + .build(); + loop { + for poll_fd in &mut poll_fds[..] { + poll_fd.clear_revents(); + } + if poll(&mut poll_fds, 0).is_err() { + // We tried! + break; + } + if poll_fds[0].revents().contains(PollFlags::IN) { + // There's still data to be read. We'll sit tight for a few milliseconds. + // If there was data, keep trying to read this FD. + if let Some(d) = backoff.next_backoff() { + thread::sleep(d); + continue; + } else { + break; } } - }() { - ts_on_data.call(Err(err), ThreadsafeFunctionCallMode::Blocking); + + // Now that we're sure that the controller FD doesn't have any events, we have + // successfully drained the child's output, so we can now invoke the callback. + break; } } - } - #[cfg(not(target_os = "linux"))] - { - if let Some((ts_on_data, _controller_fd)) = ts_on_data { - ts_on_data.call( - Err(napi::Error::new( - GenericFailure, - "the data callback is only implemented in Linux", - )), - ThreadsafeFunctionCallMode::Blocking, - ); - } - } - match child.wait() { - Ok(status) => { - if status.success() { - ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking); - } else { + // Now we drop the user_fd. We have fully read from it. + drop(user_fd); + match wait_result { + Ok(status) => { + if status.success() { + ts_on_exit.call(Ok(0), ThreadsafeFunctionCallMode::Blocking); + } else { + ts_on_exit.call( + Ok(status.code().unwrap_or(-1)), + ThreadsafeFunctionCallMode::Blocking, + ); + } + } + Err(err) => { ts_on_exit.call( - Ok(status.code().unwrap_or(-1)), + Err(napi::Error::new( + GenericFailure, + format!( + "OS error when waiting for child process to exit: {}", + err.raw_os_error().unwrap_or(-1) + ), + )), ThreadsafeFunctionCallMode::Blocking, ); } } - Err(err) => { - ts_on_exit.call( - Err(napi::Error::new( - GenericFailure, - format!( - "OS error when waiting for child process to exit: {}", - err.raw_os_error().unwrap_or(-1) - ), - )), - ThreadsafeFunctionCallMode::Blocking, - ); - } - } - }); + }); + } Ok(Pty { controller_fd: Some(controller_fd), - user_fd: Some(user_fd), should_dup_fds, pid, }) @@ -446,14 +488,12 @@ impl Pty { #[allow(dead_code)] pub fn close(&mut self) -> Result<(), napi::Error> { let controller_fd = self.controller_fd.take(); - let user_fd = self.user_fd.take(); if controller_fd.is_none() { return Err(napi::Error::new( napi::Status::GenericFailure, format!("close failed: {}", libc::EBADF), )); } - drop(user_fd); Ok(()) }