Skip to content

Commit

Permalink
poll for TIOCINQ and TIOCOUTQ instead of POLLIN, bump repeats (#51)
Browse files Browse the repository at this point in the history
we noticed a few cases where if a program exited really quickly but
dumped a lot of output (e.g. `env` in the replit shell) we could
sometimes miss output

we used to poll the controller side fd for POLLIN but that actually
isn't _fully_ correct.

the tty pipes look something like:

```
controlling program <> controller fd <> user fd <> user program
|---user space------||--------kernel space------||-user space-|
```

we can sometimes enter cases where the controller side _thinks_ it has
no more data to give to the controlling program (nodejs when using
@replit/ruspty) but the kernel has decided to block on passing some user
fd data to the controller fd (on linux for example, the pipe in the user
fd -> controller fd direction has about 4kb of capacity)

for example, if node isnt processing data events quickly enough, the
controller-side queue can fill up and the user fd write will block until
it has space again, we could rarely enter a race if nodejs decides to
read an entire 4kb block completely emptying the controller fd queue and
then the POLLIN logic could return with no more data left (even though
the user side still has a pending write!)

this would drop data :(

a few changes:
- rust-side
- poll TIOCINQ and TIOCOUTQ on controller and user instead of just
POLLIN on controller
- wrapper level
- trigger 'end' event on the read stream on EIO instead of just calling
the cb (that way, other things with handles to the socket can also
handle end appropriately)
  - exit condition is now
    - fd closed && fd fully read && program actually exited
- test level
  - bump repeats from 50 -> 500
- rewrite it to a more standard async format so errors during the test
actually are associated with that test
  - added a test for fast output (>4kb output and fast exit)
  • Loading branch information
jackyzha0 authored Oct 28, 2024
1 parent 0fe8f04 commit b81def4
Show file tree
Hide file tree
Showing 8 changed files with 404 additions and 437 deletions.
2 changes: 1 addition & 1 deletion npm/darwin-arm64/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-darwin-arm64",
"version": "3.3.0",
"version": "3.4.0",
"os": [
"darwin"
],
Expand Down
2 changes: 1 addition & 1 deletion npm/darwin-x64/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-darwin-x64",
"version": "3.3.0",
"version": "3.4.0",
"os": [
"darwin"
],
Expand Down
2 changes: 1 addition & 1 deletion npm/linux-x64-gnu/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty-linux-x64-gnu",
"version": "3.3.0",
"version": "3.4.0",
"os": [
"linux"
],
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@replit/ruspty",
"version": "3.3.0",
"version": "3.4.0",
"main": "dist/wrapper.js",
"types": "dist/wrapper.d.ts",
"author": "Szymon Kaliski <[email protected]>",
Expand Down Expand Up @@ -36,7 +36,7 @@
},
"scripts": {
"artifacts": "napi artifacts",
"build": "napi build --platform --release && npm run build:wrapper",
"build": "napi build --platform --release && npm run build:wrapper && npm run format",
"build:wrapper": "tsup",
"prepublishOnly": "napi prepublish -t npm",
"test": "vitest run",
Expand Down
68 changes: 36 additions & 32 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::Error;
use std::io::ErrorKind;
use std::io::Write;
use std::io::{Error, Write};
use std::os::fd::{AsRawFd, OwnedFd};
use std::os::fd::{BorrowedFd, FromRawFd, IntoRawFd, RawFd};
use std::os::fd::{FromRawFd, IntoRawFd, RawFd};
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;

use backoff::backoff::Backoff;
use backoff::ExponentialBackoffBuilder;
use libc::{self, c_int};
use napi::bindgen_prelude::JsFunction;
use napi::threadsafe_function::{ErrorStrategy, ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi::Status::GenericFailure;
use napi::{self, Env};
use nix::errno::Errno;
use nix::fcntl::{fcntl, FcntlArg, FdFlag, OFlag};
use nix::poll::{poll, PollFd, PollFlags, PollTimeout};
use nix::libc::{self, c_int, ioctl, FIONREAD, TIOCOUTQ, TIOCSCTTY, TIOCSWINSZ};
use nix::pty::{openpty, Winsize};
use nix::sys::termios::{self, SetArg};

Expand Down Expand Up @@ -59,45 +57,54 @@ fn cast_to_napi_error(err: Errno) -> napi::Error {
napi::Error::new(GenericFailure, err)
}

// if the child process exits before the controller fd is fully read, we might accidentally
// end in a case where onExit is called but js hasn't had the chance to fully read the controller fd
// if the child process exits before the controller fd is fully read or the user fd is fully
// flushed, we might accidentally end in a case where onExit is called but js hasn't had
// the chance to fully read the controller fd
// let's wait until the controller fd is fully read before we call onExit
fn poll_controller_fd_until_read(raw_fd: RawFd) {
// wait until fd is fully read (i.e. POLLIN no longer set)
let borrowed_fd = unsafe { BorrowedFd::borrow_raw(raw_fd) };
let poll_fd = PollFd::new(borrowed_fd, PollFlags::POLLIN);

fn poll_pty_fds_until_read(controller_fd: RawFd, user_fd: RawFd) {
let mut backoff = ExponentialBackoffBuilder::default()
.with_initial_interval(Duration::from_millis(1))
.with_max_interval(Duration::from_millis(100))
.with_max_elapsed_time(Some(Duration::from_secs(1)))
.build();

loop {
if let Err(err) = poll(&mut [poll_fd], PollTimeout::ZERO) {
if err == Errno::EINTR || err == Errno::EAGAIN {
// we were interrupted, so we should just try again
continue;
}
// check both input and output queues for both FDs
let mut controller_inq: i32 = 0;
let mut controller_outq: i32 = 0;
let mut user_inq: i32 = 0;
let mut user_outq: i32 = 0;

// we should almost never hit this, but if we do, we should just break out of the loop. this
// can happen if Node destroys the terminal before waiting for the child process to go away.
break;
}
// safe because we're passing valid file descriptors and properly sized integers
unsafe {
// check bytes waiting to be read (FIONREAD, equivalent to TIOCINQ on Linux)
if ioctl(controller_fd, FIONREAD, &mut controller_inq) == -1
|| ioctl(user_fd, FIONREAD, &mut user_inq) == -1
{
// break if we can't read
break;
}

// check if POLLIN is no longer set (i.e. there is no more data to read)
if let Some(flags) = poll_fd.revents() {
if !flags.contains(PollFlags::POLLIN) {
// check bytes waiting to be written (TIOCOUTQ)
if ioctl(controller_fd, TIOCOUTQ, &mut controller_outq) == -1
|| ioctl(user_fd, TIOCOUTQ, &mut user_outq) == -1
{
// break if we can't read
break;
}
}

// wait for a bit before trying again
// if all queues are empty, we're done
if controller_inq == 0 && controller_outq == 0 && user_inq == 0 && user_outq == 0 {
break;
}

// apply backoff strategy
if let Some(d) = backoff.next_backoff() {
thread::sleep(d);
continue;
} else {
// we have exhausted our attempts, its joever
// we have exhausted our attempts
break;
}
}
Expand Down Expand Up @@ -181,7 +188,7 @@ impl Pty {
}

// become the controlling tty for the program
let err = libc::ioctl(raw_user_fd, libc::TIOCSCTTY.into(), 0);
let err = libc::ioctl(raw_user_fd, TIOCSCTTY.into(), 0);
if err == -1 {
return Err(Error::new(ErrorKind::Other, "ioctl-TIOCSCTTY"));
}
Expand Down Expand Up @@ -244,10 +251,7 @@ impl Pty {
let wait_result = child.wait();

// try to wait for the controller fd to be fully read
poll_controller_fd_until_read(raw_controller_fd);

// we don't drop the controller fd immediately
// let pty.close() be responsible for closing it
poll_pty_fds_until_read(raw_controller_fd, raw_user_fd);
drop(user_fd);

match wait_result {
Expand Down Expand Up @@ -310,7 +314,7 @@ fn pty_resize(fd: i32, size: Size) -> Result<(), napi::Error> {
ws_ypixel: 0,
};

let res = unsafe { libc::ioctl(fd, libc::TIOCSWINSZ, &window_size as *const _) };
let res = unsafe { libc::ioctl(fd, TIOCSWINSZ, &window_size as *const _) };
if res == -1 {
return Err(napi::Error::new(
napi::Status::GenericFailure,
Expand Down
Loading

0 comments on commit b81def4

Please sign in to comment.