Skip to content

Commit

Permalink
fix: reduce deadlock potential in shell->client
Browse files Browse the repository at this point in the history
This patch attempts to reduce the potential to deadlock
in the shell->client thread by removing the mutex lock
around the shell->client unix socket stream. While recent
changes to reduce deadlocks have successfully prevented
a single locked session from gumming up all the other sessions,
the core issue seems to be persisting. I don't see a smoking
gun in the logs that a reporter sent me, but reducing the
number of locks in the shell->client thread, which seems to
be the one that gets locked up, might help. This should also
be slight better for performance (not that it really matters here).

In addition to that main change, this patch contains a few
little bugfixes:

- Previously, every time the client probed for daemon presence,
  the daemon would put a distracting stack trace in its log. This
  wasn't a real issue, but it could be confusing for people not
  familiar with how shpool works. I've had a few people point to
  it in bug reports. I suppressed the stack trace and replaced it
  with a more civilized log line. Stack traces in the logs should
  be indicative of an issue, not business as usual.

- I fixed the child_watcher thread to directly call waitpid rather
  than using the shpool_pty crate to do the work. This isn't ideal,
  but is required to avoid a use-after-close issue that was causing
  trouble. I think this bug was already present, but it just wasn't
  presenting a problem during tests for whatever reason (likely just
  timing). I had to fix this to get the tests passing.
  • Loading branch information
ethanpailes committed Jan 22, 2025
1 parent a2c6de1 commit 8b6f250
Show file tree
Hide file tree
Showing 4 changed files with 166 additions and 72 deletions.
27 changes: 12 additions & 15 deletions libshpool/src/config_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,18 @@ impl<Handler> ConfigWatcherInner<Handler> {
self.paths.drain().map(|(path, (watched_path, _))| (path, watched_path)).collect()
}
};
rewatch_paths
.into_iter()
.map(|(path, watched_path)| {
if let Err(err) = self.watcher.unwatch(&watched_path) {
// error sometimes is expected if the watched_path was simply removed, in that
// case notify will automatically remove the watch.
error!("error unwatch {:?}", err);
} else {
debug!("unwatched {}", watched_path.display());
}
watch_and_add(&mut self.watcher, self.paths.entry(path))
.map_err(|err| error!("Failed to add watch: {:?}", err))
.unwrap_or(true)
})
.any(|reload| reload)
rewatch_paths.into_iter().any(|(path, watched_path)| {
if let Err(err) = self.watcher.unwatch(&watched_path) {
// error sometimes is expected if the watched_path was simply removed, in that
// case notify will automatically remove the watch.
error!("error unwatch {:?}", err);
} else {
debug!("unwatched {}", watched_path.display());
}
watch_and_add(&mut self.watcher, self.paths.entry(path))
.map_err(|err| error!("Failed to add watch: {:?}", err))
.unwrap_or(true)
})
}
}

Expand Down
78 changes: 60 additions & 18 deletions libshpool/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl Server {

// advertize our protocol version to the client so that it can
// warn about mismatches
protocol::encode_to(
match protocol::encode_to(
&VersionHeader {
// We allow fake version to be injected for ease of testing.
// Otherwise we would have to resort to some heinous build
Expand All @@ -150,8 +150,19 @@ impl Server {
},
},
&mut stream,
)
.context("writing version header")?;
) {
Ok(_) => {}
Err(e)
if e.root_cause()
.downcast_ref::<io::Error>()
.map(|ioe| ioe.kind() == io::ErrorKind::BrokenPipe)
.unwrap_or(false) =>
{
info!("broken pipe while writing version, likely just a daemon presence probe");
return Ok(());
}
Err(e) => return Err(e).context("while writing version"),
}

let header = parse_connect_header(&mut stream).context("parsing connect header")?;

Expand Down Expand Up @@ -239,8 +250,8 @@ impl Server {
Some(exit_status) => {
// the channel is closed so we know the subshell exited
info!(
"stale inner={:?}, (child exited with status {}) clobbering with new subshell",
inner, exit_status
"stale inner, (child exited with status {}) clobbering with new subshell",
exit_status
);
status = AttachStatus::Created { warnings };
}
Expand Down Expand Up @@ -378,7 +389,7 @@ impl Server {
error!("error shuffling bytes: {:?}", e);
}
}
info!("bidi stream loop finished");
info!("bidi stream loop finished child_done={}", child_done);

if child_done {
info!("'{}' exited, removing from session table", header.name);
Expand Down Expand Up @@ -732,27 +743,51 @@ impl Server {
// spawn a background thread to reap the shell when it exits
// and notify about the exit by closing a channel.
let child_exit_notifier = Arc::new(ExitNotifier::new());
let waitable_child = fork.clone();

// Just cloning the fork and directly calling wait_for_exit() on it would
// be simpler, but it would be wrong because then the destructor for the
// cloned fork object would close the pty fd earlier than we want as the
// child watcher thread exits. This can cause the shell->client thread
// to read the wrong file (for example, the config file contents if the
// config watcher reloads).
let waitable_child_pid = fork.child_pid().ok_or(anyhow!("missing child pid"))?;
let session_name = header.name.clone();
let notifiable_child_exit_notifier = Arc::clone(&child_exit_notifier);
thread::spawn(move || {
let _s = span!(Level::INFO, "child_watcher", s = session_name, cid = conn_id).entered();

match waitable_child.wait_for_exit() {
Ok((_, Some(exit_status))) => {
info!("child exited with status {}", exit_status);
notifiable_child_exit_notifier.notify_exit(exit_status);
let mut err = None;
let mut status = 0;
let mut unpacked_status = None;
loop {
// Saftey: all basic ffi, the pid is valid before this returns.
unsafe {
match libc::waitpid(waitable_child_pid, &mut status, 0) {
0 => continue,
-1 => {
err = Some("waitpid failed");
break;
}
_ => {
if libc::WIFEXITED(status) {
unpacked_status = Some(libc::WEXITSTATUS(status));
}
break;
}
}
}
Ok((_, None)) => {
}
if let Some(status) = unpacked_status {
info!("child exited with status {}", status);
notifiable_child_exit_notifier.notify_exit(status);
} else {
if let Some(e) = err {
info!("child exited without status, using 1: {:?}", e);
} else {
info!("child exited without status, using 1");
notifiable_child_exit_notifier.notify_exit(1);
}
Err(e) => {
info!("error waiting on child, using exit status 1: {:?}", e);
notifiable_child_exit_notifier.notify_exit(1);
}
notifiable_child_exit_notifier.notify_exit(1);
}
info!("reaped child shell: {:?}", waitable_child);
});

// Inject the prompt prefix, if any. For custom commands, avoid doing this
Expand All @@ -776,11 +811,16 @@ impl Server {
let (tty_size_change_tx, tty_size_change_rx) = crossbeam_channel::bounded(0);
let (tty_size_change_ack_tx, tty_size_change_ack_rx) = crossbeam_channel::bounded(0);

let (heartbeat_tx, heartbeat_rx) = crossbeam_channel::bounded(0);
let (heartbeat_ack_tx, heartbeat_ack_rx) = crossbeam_channel::bounded(0);

let shell_to_client_ctl = Arc::new(Mutex::new(shell::ReaderCtl {
client_connection: client_connection_tx,
client_connection_ack: client_connection_ack_rx,
tty_size_change: tty_size_change_tx,
tty_size_change_ack: tty_size_change_ack_rx,
heartbeat: heartbeat_tx,
heartbeat_ack: heartbeat_ack_rx,
}));
let mut session_inner = shell::SessionInner {
name: header.name.clone(),
Expand Down Expand Up @@ -813,6 +853,8 @@ impl Server {
client_connection_ack: client_connection_ack_tx,
tty_size_change: tty_size_change_rx,
tty_size_change_ack: tty_size_change_ack_tx,
heartbeat: heartbeat_rx,
heartbeat_ack: heartbeat_ack_tx,
})?);

if let Some(ttl_secs) = header.ttl_secs {
Expand Down
Loading

0 comments on commit 8b6f250

Please sign in to comment.