From 2f858b624d6fab9a31ba1a0a3ae1dd5bd1f7dbf6 Mon Sep 17 00:00:00 2001 From: Ethan Pailes Date: Tue, 3 Dec 2024 18:34:28 +0000 Subject: [PATCH] fix: reduce deadlock potential in shell->client This patch attempts to reduce the potential to deadlock in the shell->client thread by removing the mutex lock around the shell->client unix socket stream. While recent changes to reduce deadlocks have successfully prevented a single locked session from gumming up all the other sessions, the core issue seems to be persisting. I don't see a smoking gun in the logs that a reporter sent me, but reducing the number of locks in the shell->client thread, which seems to be the one that gets locked up, might help. This should also be slight better for performance (not that it really matters here). In addition to that main change, this patch contains a few little bugfixes: - Previously, every time the client probed for daemon presence, the daemon would put a distracting stack trace in its log. This wasn't a real issue, but it could be confusing for people not familiar with how shpool works. I've had a few people point to it in bug reports. I suppressed the stack trace and replaced it with a more civilized log line. Stack traces in the logs should be indicative of an issue, not business as usual. - I fixed the child_watcher thread to directly call waitpid rather than using the shpool_pty crate to do the work. This isn't ideal, but is required to avoid a use-after-close issue that was causing trouble. I think this bug was already present, but it just wasn't presenting a problem during tests for whatever reason (likely just timing). I had to fix this to get the tests passing. --- libshpool/src/config_watcher.rs | 3 +- libshpool/src/daemon/server.rs | 78 ++++++++++++++----- libshpool/src/daemon/shell.rs | 131 +++++++++++++++++++++++--------- 3 files changed, 154 insertions(+), 58 deletions(-) diff --git a/libshpool/src/config_watcher.rs b/libshpool/src/config_watcher.rs index f2de5fb1..ae9c365d 100644 --- a/libshpool/src/config_watcher.rs +++ b/libshpool/src/config_watcher.rs @@ -323,7 +323,7 @@ impl ConfigWatcherInner { }; rewatch_paths .into_iter() - .map(|(path, watched_path)| { + .any(|(path, watched_path)| { if let Err(err) = self.watcher.unwatch(&watched_path) { // error sometimes is expected if the watched_path was simply removed, in that // case notify will automatically remove the watch. @@ -335,7 +335,6 @@ impl ConfigWatcherInner { .map_err(|err| error!("Failed to add watch: {:?}", err)) .unwrap_or(true) }) - .any(|reload| reload) } } diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 1650557c..4c99b69f 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -139,7 +139,7 @@ impl Server { // advertize our protocol version to the client so that it can // warn about mismatches - protocol::encode_to( + match protocol::encode_to( &VersionHeader { // We allow fake version to be injected for ease of testing. // Otherwise we would have to resort to some heinous build @@ -150,8 +150,19 @@ impl Server { }, }, &mut stream, - ) - .context("writing version header")?; + ) { + Ok(_) => {} + Err(e) + if e.root_cause() + .downcast_ref::() + .map(|ioe| ioe.kind() == io::ErrorKind::BrokenPipe) + .unwrap_or(false) => + { + info!("broken pipe while writing version, likely just a daemon presence probe"); + return Ok(()); + } + Err(e) => return Err(e).context("while writing version"), + } let header = parse_connect_header(&mut stream).context("parsing connect header")?; @@ -239,8 +250,8 @@ impl Server { Some(exit_status) => { // the channel is closed so we know the subshell exited info!( - "stale inner={:?}, (child exited with status {}) clobbering with new subshell", - inner, exit_status + "stale inner, (child exited with status {}) clobbering with new subshell", + exit_status ); status = AttachStatus::Created { warnings }; } @@ -378,7 +389,7 @@ impl Server { error!("error shuffling bytes: {:?}", e); } } - info!("bidi stream loop finished"); + info!("bidi stream loop finished child_done={}", child_done); if child_done { info!("'{}' exited, removing from session table", header.name); @@ -732,27 +743,51 @@ impl Server { // spawn a background thread to reap the shell when it exits // and notify about the exit by closing a channel. let child_exit_notifier = Arc::new(ExitNotifier::new()); - let waitable_child = fork.clone(); + + // Just cloning the fork and directly calling wait_for_exit() on it would + // be simpler, but it would be wrong because then the destructor for the + // cloned fork object would close the pty fd earlier than we want as the + // child watcher thread exits. This can cause the shell->client thread + // to read the wrong file (for example, the config file contents if the + // config watcher reloads). + let waitable_child_pid = fork.child_pid().ok_or(anyhow!("missing child pid"))?; let session_name = header.name.clone(); let notifiable_child_exit_notifier = Arc::clone(&child_exit_notifier); thread::spawn(move || { let _s = span!(Level::INFO, "child_watcher", s = session_name, cid = conn_id).entered(); - match waitable_child.wait_for_exit() { - Ok((_, Some(exit_status))) => { - info!("child exited with status {}", exit_status); - notifiable_child_exit_notifier.notify_exit(exit_status); + let mut err = None; + let mut status = 0; + let mut unpacked_status = None; + loop { + // Saftey: all basic ffi, the pid is valid before this returns. + unsafe { + match libc::waitpid(waitable_child_pid, &mut status, 0) { + 0 => continue, + -1 => { + err = Some("waitpid failed"); + break; + } + _ => { + if libc::WIFEXITED(status) { + unpacked_status = Some(libc::WEXITSTATUS(status)); + } + break; + } + } } - Ok((_, None)) => { + } + if let Some(status) = unpacked_status { + info!("child exited with status {}", status); + notifiable_child_exit_notifier.notify_exit(status); + } else { + if let Some(e) = err { + info!("child exited without status, using 1: {:?}", e); + } else { info!("child exited without status, using 1"); - notifiable_child_exit_notifier.notify_exit(1); - } - Err(e) => { - info!("error waiting on child, using exit status 1: {:?}", e); - notifiable_child_exit_notifier.notify_exit(1); } + notifiable_child_exit_notifier.notify_exit(1); } - info!("reaped child shell: {:?}", waitable_child); }); // Inject the prompt prefix, if any. For custom commands, avoid doing this @@ -776,11 +811,16 @@ impl Server { let (tty_size_change_tx, tty_size_change_rx) = crossbeam_channel::bounded(0); let (tty_size_change_ack_tx, tty_size_change_ack_rx) = crossbeam_channel::bounded(0); + let (heartbeat_tx, heartbeat_rx) = crossbeam_channel::bounded(0); + let (heartbeat_ack_tx, heartbeat_ack_rx) = crossbeam_channel::bounded(0); + let shell_to_client_ctl = Arc::new(Mutex::new(shell::ReaderCtl { client_connection: client_connection_tx, client_connection_ack: client_connection_ack_rx, tty_size_change: tty_size_change_tx, tty_size_change_ack: tty_size_change_ack_rx, + heartbeat: heartbeat_tx, + heartbeat_ack: heartbeat_ack_rx, })); let mut session_inner = shell::SessionInner { name: header.name.clone(), @@ -813,6 +853,8 @@ impl Server { client_connection_ack: client_connection_ack_tx, tty_size_change: tty_size_change_rx, tty_size_change_ack: tty_size_change_ack_tx, + heartbeat: heartbeat_rx, + heartbeat_ack: heartbeat_ack_tx, })?); if let Some(ttl_secs) = header.ttl_secs { diff --git a/libshpool/src/daemon/shell.rs b/libshpool/src/daemon/shell.rs index 3d8d9a79..7f912eab 100644 --- a/libshpool/src/daemon/shell.rs +++ b/libshpool/src/daemon/shell.rs @@ -124,9 +124,8 @@ pub struct SessionInner { /// shell->client thread. pub struct ClientConnection { /// All output data should be written to this sink rather than - /// directly to the unix stream. The mutex makes sure that we don't - /// accidentally interleave with heartbeat frames. - sink: Arc>>, + /// directly to the unix stream. + sink: io::BufWriter, /// The size of the client tty. size: TtySize, /// The raw unix socket stream. The shell->client thread should @@ -188,6 +187,9 @@ pub struct ReaderArgs { pub client_connection_ack: crossbeam_channel::Sender, pub tty_size_change: crossbeam_channel::Receiver, pub tty_size_change_ack: crossbeam_channel::Sender<()>, + pub heartbeat: crossbeam_channel::Receiver<()>, + // true if the client is still live, false if it has hung up on us + pub heartbeat_ack: crossbeam_channel::Sender, } impl SessionInner { @@ -263,7 +265,8 @@ impl SessionInner { Ok(ClientConnectionMsg::New(conn)) => { info!("got new connection (rows={}, cols={})", conn.size.rows, conn.size.cols); do_reattach = true; - let ack = if let ClientConnectionMsg::New(old_conn) = client_conn { + let ack = if let ClientConnectionMsg::New(mut old_conn) = client_conn { + Self::write_exit_chunk(&mut old_conn.sink, 0); old_conn.stream.shutdown(net::Shutdown::Both)?; ClientConnectionStatus::Replaced } else { @@ -298,7 +301,7 @@ impl SessionInner { Ok(ClientConnectionMsg::Disconnect) => { let ack = if let ClientConnectionMsg::New(mut old_conn) = client_conn { info!("disconnect, shutting down client stream"); - Self::write_exit_chunk(&mut old_conn.stream, 0); + Self::write_exit_chunk(&mut old_conn.sink, 0); old_conn.stream.shutdown(net::Shutdown::Both)?; ClientConnectionStatus::Detached } else { @@ -317,7 +320,7 @@ impl SessionInner { // write an exit status frame so the attach process // can exit with the same exit code as the child shell - Self::write_exit_chunk(&mut old_conn.stream, exit_status); + Self::write_exit_chunk(&mut old_conn.sink, exit_status); old_conn.stream.shutdown(net::Shutdown::Both)?; ClientConnectionStatus::Detached @@ -362,6 +365,30 @@ impl SessionInner { } } } + recv(args.heartbeat) -> _ => { + let client_present = if let ClientConnectionMsg::New(conn) = &mut client_conn { + let chunk = Chunk { kind: ChunkKind::Heartbeat, buf: &[] }; + match chunk.write_to(&mut conn.sink).and_then(|_| conn.sink.flush()) { + Ok(_) => { + trace!("wrote heartbeat"); + true + } + Err(e) if e.kind() == io::ErrorKind::BrokenPipe => { + trace!("client hangup: {:?}", e); + false + } + Err(e) => { + error!("unexpected IO error while writing heartbeat: {}", e); + return Err(e).context("writing heartbeat")?; + } + } + } else { + false + }; + + args.heartbeat_ack.send(client_present) + .context("sending heartbeat ack")?; + } // make this select non-blocking so we spend most of our time parked // in poll @@ -373,9 +400,13 @@ impl SessionInner { if resize_cmd.when.saturating_duration_since(time::Instant::now()) == time::Duration::ZERO { - resize_cmd - .size - .set_fd(pty_master.raw_fd().ok_or(anyhow!("no master fd"))?)?; + let status = pty_master + .raw_fd() + .ok_or(anyhow!("no master fd")) + .and_then(|fd| resize_cmd.size.set_fd(fd)); + if let Err(e) = status { + warn!("error resizing pty: {}", e); + } executed_resize = true; info!( "resized fd (rows={}, cols={})", @@ -411,25 +442,28 @@ impl SessionInner { (_, _) => vec![], }; if let (true, ClientConnectionMsg::New(conn)) = - (!restore_buf.is_empty(), &client_conn) + (!restore_buf.is_empty(), &mut client_conn) { trace!("restore chunk='{}'", String::from_utf8_lossy(&restore_buf[..])); // send the restore buffer, broken up into chunks so that we don't make // the client allocate too much - let mut s = conn.sink.lock().unwrap(); for block in restore_buf.as_slice().chunks(consts::BUF_SIZE) { let chunk = Chunk { kind: ChunkKind::Data, buf: block }; - if let Err(err) = chunk.write_to(&mut *s) { + if let Err(err) = chunk.write_to(&mut conn.sink) { warn!("err writing session-restore buf: {:?}", err); } } - if let Err(err) = s.flush() { + if let Err(err) = conn.sink.flush() { warn!("err flushing session-restore: {:?}", err); } } } + // TODO(ethan): what if poll times out on a tick when we have just + // set up a restore chunk? It looks like we will just drop the + // data as things are now. + // Block until the shell has some data for us so we can be sure our reads // always succeed. We don't want to end up blocked forever on a read while // a client is trying to attach. @@ -483,23 +517,22 @@ impl SessionInner { } if let (ClientConnectionMsg::New(conn), true) = - (&client_conn, has_seen_prompt_sentinel) + (&mut client_conn, has_seen_prompt_sentinel) { let chunk = Chunk { kind: ChunkKind::Data, buf }; - let mut s = conn.sink.lock().unwrap(); - // If we still need to do an initial motd dump, it means we have just finished // dropping all the prompt setup stuff, we should dump the motd now before we // write the first chunk. if needs_initial_motd_dump { needs_initial_motd_dump = false; - if let Err(e) = daily_messenger.dump(&mut *s, &term_db) { + if let Err(e) = daily_messenger.dump(&mut conn.sink, &term_db) { warn!("Error handling clear: {:?}", e); } } - let write_result = chunk.write_to(&mut *s).and_then(|_| s.flush()); + let write_result = + chunk.write_to(&mut conn.sink).and_then(|_| conn.sink.flush()); if let Err(err) = write_result { info!("client_stream write err, assuming hangup: {:?}", err); reset_client_conn = true; @@ -518,10 +551,10 @@ impl SessionInner { .spawn(move || log_if_error("error in shell->client", closure()))?) } - fn write_exit_chunk(mut stream: W, status: i32) { + fn write_exit_chunk(mut sink: W, status: i32) { let status_buf: [u8; 4] = status.to_le_bytes(); let chunk = Chunk { kind: ChunkKind::ExitStatus, buf: status_buf.as_slice() }; - match chunk.write_to(&mut stream).and_then(|_| stream.flush()) { + match chunk.write_to(&mut sink).and_then(|_| sink.flush()) { Ok(_) => { trace!("wrote exit status chunk"); } @@ -559,9 +592,8 @@ impl SessionInner { client_stream.try_clone().context("creating client->shell client stream")?; let shell_to_client_client_stream = client_stream.try_clone().context("creating shell->client client stream handle")?; - let client_stream_m = Arc::new(Mutex::new(io::BufWriter::new( - client_stream.try_clone().context("wrapping stream in bufwriter")?, - ))); + let output_sink = + io::BufWriter::new(client_stream.try_clone().context("wrapping stream in bufwriter")?); { let _s = span!(Level::INFO, "initial_attach_lock(shell_to_client_ctl)").entered(); @@ -570,7 +602,7 @@ impl SessionInner { .client_connection .send_timeout( ClientConnectionMsg::New(ClientConnection { - sink: Arc::clone(&client_stream_m), + sink: output_sink, size: init_tty_size, stream: shell_to_client_client_stream, }), @@ -600,8 +632,7 @@ impl SessionInner { // Send a steady stream of heartbeats to the client // so that if the connection unexpectedly goes // down, we detect it immediately. - let heartbeat_h = self.spawn_heartbeat( - s, conn_id, &stop, &client_stream_m)?; + let heartbeat_h = self.spawn_heartbeat(s, conn_id, &stop)?; // poll the pty master fd to see if the child // shell has exited. @@ -677,7 +708,6 @@ impl SessionInner { } debug!("joined all threads"); - Ok(()) }).context("outer thread scope")?; @@ -843,7 +873,6 @@ impl SessionInner { scope: &'scope thread::Scope<'scope, '_>, conn_id: usize, stop: &'scope AtomicBool, - client_stream_m: &'scope Arc>>, ) -> anyhow::Result>> { thread::Builder::new() .name(format!("heartbeat({})", self.name)) @@ -858,20 +887,39 @@ impl SessionInner { } thread::sleep(consts::HEARTBEAT_DURATION); - let chunk = Chunk { kind: ChunkKind::Heartbeat, buf: &[] }; { - let mut s = client_stream_m.lock().unwrap(); - match chunk.write_to(&mut *s).and_then(|_| s.flush()) { - Ok(_) => { - trace!("wrote heartbeat"); - } - Err(e) if e.kind() == io::ErrorKind::BrokenPipe => { - trace!("client hangup: {:?}", e); - return Ok(()); + let shell_to_client_ctl = self.shell_to_client_ctl.lock().unwrap(); + match shell_to_client_ctl + .heartbeat + .send_timeout((), SHELL_TO_CLIENT_CTL_TIMEOUT) + { + // If the channel is disconnected, it means that the shell exited and + // the shell->client process exited cleanly. We should not raise a + // ruckus. + Err(crossbeam_channel::SendTimeoutError::Disconnected(_)) => { + return Ok(()) } Err(e) => { - return Err(e).context("writing heartbeat")?; + return Err(e) + .context("requesting heartbeat from shell->client thread") } + _ => {} + } + let client_present = match shell_to_client_ctl + .heartbeat_ack + .recv_timeout(SHELL_TO_CLIENT_CTL_TIMEOUT) + { + // If the channel is disconnected, it means that the shell exited and + // the shell->client process exited cleanly. We should not raise a + // ruckus. + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => return Ok(()), + Err(e) => return Err(e).context("waiting for heartbeat ack"), + Ok(client_present) => client_present, + }; + if !client_present { + // Bail from the thread to get the rest of the + // client threads to clean themselves up. + return Ok(()); } } } @@ -966,6 +1014,13 @@ pub struct ReaderCtl { /// A control channel for the shell->client thread. Acks the completion of a /// spool resize. pub tty_size_change_ack: crossbeam_channel::Receiver<()>, + + // A control channel telling the shell->client thread to issue + // a heartbeat to check if the client is still listening. + pub heartbeat: crossbeam_channel::Sender<()>, + // True if the client is still listening, false if it has hung up + // on us. + pub heartbeat_ack: crossbeam_channel::Receiver, } /// Given a buffer, a length after which the data is not valid, a list of