diff --git a/libshpool/src/config_watcher.rs b/libshpool/src/config_watcher.rs index f2de5fb1..ae9c365d 100644 --- a/libshpool/src/config_watcher.rs +++ b/libshpool/src/config_watcher.rs @@ -323,7 +323,7 @@ impl<Handler> ConfigWatcherInner<Handler> { }; rewatch_paths .into_iter() - .map(|(path, watched_path)| { + .any(|(path, watched_path)| { if let Err(err) = self.watcher.unwatch(&watched_path) { // error sometimes is expected if the watched_path was simply removed, in that // case notify will automatically remove the watch. @@ -335,7 +335,6 @@ impl<Handler> ConfigWatcherInner<Handler> { .map_err(|err| error!("Failed to add watch: {:?}", err)) .unwrap_or(true) }) - .any(|reload| reload) } } diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 1650557c..4c99b69f 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -139,7 +139,7 @@ impl Server { // advertize our protocol version to the client so that it can // warn about mismatches - protocol::encode_to( + match protocol::encode_to( &VersionHeader { // We allow fake version to be injected for ease of testing. // Otherwise we would have to resort to some heinous build @@ -150,8 +150,19 @@ impl Server { }, }, &mut stream, - ) - .context("writing version header")?; + ) { + Ok(_) => {} + Err(e) + if e.root_cause() + .downcast_ref::<io::Error>() + .map(|ioe| ioe.kind() == io::ErrorKind::BrokenPipe) + .unwrap_or(false) => + { + info!("broken pipe while writing version, likely just a daemon presence probe"); + return Ok(()); + } + Err(e) => return Err(e).context("while writing version"), + } let header = parse_connect_header(&mut stream).context("parsing connect header")?; @@ -239,8 +250,8 @@ impl Server { Some(exit_status) => { // the channel is closed so we know the subshell exited info!( - "stale inner={:?}, (child exited with status {}) clobbering with new subshell", - inner, exit_status + "stale inner, (child exited with status {}) clobbering with new subshell", + exit_status ); status = AttachStatus::Created { warnings }; } @@ -378,7 +389,7 @@ impl Server { error!("error shuffling bytes: {:?}", e); } } - info!("bidi stream loop finished"); + info!("bidi stream loop finished child_done={}", child_done); if child_done { info!("'{}' exited, removing from session table", header.name); @@ -732,27 +743,51 @@ impl Server { // spawn a background thread to reap the shell when it exits // and notify about the exit by closing a channel. let child_exit_notifier = Arc::new(ExitNotifier::new()); - let waitable_child = fork.clone(); + + // Just cloning the fork and directly calling wait_for_exit() on it would + // be simpler, but it would be wrong because then the destructor for the + // cloned fork object would close the pty fd earlier than we want as the + // child watcher thread exits. This can cause the shell->client thread + // to read the wrong file (for example, the config file contents if the + // config watcher reloads). + let waitable_child_pid = fork.child_pid().ok_or(anyhow!("missing child pid"))?; let session_name = header.name.clone(); let notifiable_child_exit_notifier = Arc::clone(&child_exit_notifier); thread::spawn(move || { let _s = span!(Level::INFO, "child_watcher", s = session_name, cid = conn_id).entered(); - match waitable_child.wait_for_exit() { - Ok((_, Some(exit_status))) => { - info!("child exited with status {}", exit_status); - notifiable_child_exit_notifier.notify_exit(exit_status); + let mut err = None; + let mut status = 0; + let mut unpacked_status = None; + loop { + // Saftey: all basic ffi, the pid is valid before this returns. + unsafe { + match libc::waitpid(waitable_child_pid, &mut status, 0) { + 0 => continue, + -1 => { + err = Some("waitpid failed"); + break; + } + _ => { + if libc::WIFEXITED(status) { + unpacked_status = Some(libc::WEXITSTATUS(status)); + } + break; + } + } } - Ok((_, None)) => { + } + if let Some(status) = unpacked_status { + info!("child exited with status {}", status); + notifiable_child_exit_notifier.notify_exit(status); + } else { + if let Some(e) = err { + info!("child exited without status, using 1: {:?}", e); + } else { info!("child exited without status, using 1"); - notifiable_child_exit_notifier.notify_exit(1); - } - Err(e) => { - info!("error waiting on child, using exit status 1: {:?}", e); - notifiable_child_exit_notifier.notify_exit(1); } + notifiable_child_exit_notifier.notify_exit(1); } - info!("reaped child shell: {:?}", waitable_child); }); // Inject the prompt prefix, if any. For custom commands, avoid doing this @@ -776,11 +811,16 @@ impl Server { let (tty_size_change_tx, tty_size_change_rx) = crossbeam_channel::bounded(0); let (tty_size_change_ack_tx, tty_size_change_ack_rx) = crossbeam_channel::bounded(0); + let (heartbeat_tx, heartbeat_rx) = crossbeam_channel::bounded(0); + let (heartbeat_ack_tx, heartbeat_ack_rx) = crossbeam_channel::bounded(0); + let shell_to_client_ctl = Arc::new(Mutex::new(shell::ReaderCtl { client_connection: client_connection_tx, client_connection_ack: client_connection_ack_rx, tty_size_change: tty_size_change_tx, tty_size_change_ack: tty_size_change_ack_rx, + heartbeat: heartbeat_tx, + heartbeat_ack: heartbeat_ack_rx, })); let mut session_inner = shell::SessionInner { name: header.name.clone(), @@ -813,6 +853,8 @@ impl Server { client_connection_ack: client_connection_ack_tx, tty_size_change: tty_size_change_rx, tty_size_change_ack: tty_size_change_ack_tx, + heartbeat: heartbeat_rx, + heartbeat_ack: heartbeat_ack_tx, })?); if let Some(ttl_secs) = header.ttl_secs { diff --git a/libshpool/src/daemon/shell.rs b/libshpool/src/daemon/shell.rs index 3d8d9a79..7f912eab 100644 --- a/libshpool/src/daemon/shell.rs +++ b/libshpool/src/daemon/shell.rs @@ -124,9 +124,8 @@ pub struct SessionInner { /// shell->client thread. pub struct ClientConnection { /// All output data should be written to this sink rather than - /// directly to the unix stream. The mutex makes sure that we don't - /// accidentally interleave with heartbeat frames. - sink: Arc<Mutex<io::BufWriter<UnixStream>>>, + /// directly to the unix stream. + sink: io::BufWriter<UnixStream>, /// The size of the client tty. size: TtySize, /// The raw unix socket stream. The shell->client thread should @@ -188,6 +187,9 @@ pub struct ReaderArgs { pub client_connection_ack: crossbeam_channel::Sender<ClientConnectionStatus>, pub tty_size_change: crossbeam_channel::Receiver<TtySize>, pub tty_size_change_ack: crossbeam_channel::Sender<()>, + pub heartbeat: crossbeam_channel::Receiver<()>, + // true if the client is still live, false if it has hung up on us + pub heartbeat_ack: crossbeam_channel::Sender<bool>, } impl SessionInner { @@ -263,7 +265,8 @@ impl SessionInner { Ok(ClientConnectionMsg::New(conn)) => { info!("got new connection (rows={}, cols={})", conn.size.rows, conn.size.cols); do_reattach = true; - let ack = if let ClientConnectionMsg::New(old_conn) = client_conn { + let ack = if let ClientConnectionMsg::New(mut old_conn) = client_conn { + Self::write_exit_chunk(&mut old_conn.sink, 0); old_conn.stream.shutdown(net::Shutdown::Both)?; ClientConnectionStatus::Replaced } else { @@ -298,7 +301,7 @@ impl SessionInner { Ok(ClientConnectionMsg::Disconnect) => { let ack = if let ClientConnectionMsg::New(mut old_conn) = client_conn { info!("disconnect, shutting down client stream"); - Self::write_exit_chunk(&mut old_conn.stream, 0); + Self::write_exit_chunk(&mut old_conn.sink, 0); old_conn.stream.shutdown(net::Shutdown::Both)?; ClientConnectionStatus::Detached } else { @@ -317,7 +320,7 @@ impl SessionInner { // write an exit status frame so the attach process // can exit with the same exit code as the child shell - Self::write_exit_chunk(&mut old_conn.stream, exit_status); + Self::write_exit_chunk(&mut old_conn.sink, exit_status); old_conn.stream.shutdown(net::Shutdown::Both)?; ClientConnectionStatus::Detached @@ -362,6 +365,30 @@ impl SessionInner { } } } + recv(args.heartbeat) -> _ => { + let client_present = if let ClientConnectionMsg::New(conn) = &mut client_conn { + let chunk = Chunk { kind: ChunkKind::Heartbeat, buf: &[] }; + match chunk.write_to(&mut conn.sink).and_then(|_| conn.sink.flush()) { + Ok(_) => { + trace!("wrote heartbeat"); + true + } + Err(e) if e.kind() == io::ErrorKind::BrokenPipe => { + trace!("client hangup: {:?}", e); + false + } + Err(e) => { + error!("unexpected IO error while writing heartbeat: {}", e); + return Err(e).context("writing heartbeat")?; + } + } + } else { + false + }; + + args.heartbeat_ack.send(client_present) + .context("sending heartbeat ack")?; + } // make this select non-blocking so we spend most of our time parked // in poll @@ -373,9 +400,13 @@ impl SessionInner { if resize_cmd.when.saturating_duration_since(time::Instant::now()) == time::Duration::ZERO { - resize_cmd - .size - .set_fd(pty_master.raw_fd().ok_or(anyhow!("no master fd"))?)?; + let status = pty_master + .raw_fd() + .ok_or(anyhow!("no master fd")) + .and_then(|fd| resize_cmd.size.set_fd(fd)); + if let Err(e) = status { + warn!("error resizing pty: {}", e); + } executed_resize = true; info!( "resized fd (rows={}, cols={})", @@ -411,25 +442,28 @@ impl SessionInner { (_, _) => vec![], }; if let (true, ClientConnectionMsg::New(conn)) = - (!restore_buf.is_empty(), &client_conn) + (!restore_buf.is_empty(), &mut client_conn) { trace!("restore chunk='{}'", String::from_utf8_lossy(&restore_buf[..])); // send the restore buffer, broken up into chunks so that we don't make // the client allocate too much - let mut s = conn.sink.lock().unwrap(); for block in restore_buf.as_slice().chunks(consts::BUF_SIZE) { let chunk = Chunk { kind: ChunkKind::Data, buf: block }; - if let Err(err) = chunk.write_to(&mut *s) { + if let Err(err) = chunk.write_to(&mut conn.sink) { warn!("err writing session-restore buf: {:?}", err); } } - if let Err(err) = s.flush() { + if let Err(err) = conn.sink.flush() { warn!("err flushing session-restore: {:?}", err); } } } + // TODO(ethan): what if poll times out on a tick when we have just + // set up a restore chunk? It looks like we will just drop the + // data as things are now. + // Block until the shell has some data for us so we can be sure our reads // always succeed. We don't want to end up blocked forever on a read while // a client is trying to attach. @@ -483,23 +517,22 @@ impl SessionInner { } if let (ClientConnectionMsg::New(conn), true) = - (&client_conn, has_seen_prompt_sentinel) + (&mut client_conn, has_seen_prompt_sentinel) { let chunk = Chunk { kind: ChunkKind::Data, buf }; - let mut s = conn.sink.lock().unwrap(); - // If we still need to do an initial motd dump, it means we have just finished // dropping all the prompt setup stuff, we should dump the motd now before we // write the first chunk. if needs_initial_motd_dump { needs_initial_motd_dump = false; - if let Err(e) = daily_messenger.dump(&mut *s, &term_db) { + if let Err(e) = daily_messenger.dump(&mut conn.sink, &term_db) { warn!("Error handling clear: {:?}", e); } } - let write_result = chunk.write_to(&mut *s).and_then(|_| s.flush()); + let write_result = + chunk.write_to(&mut conn.sink).and_then(|_| conn.sink.flush()); if let Err(err) = write_result { info!("client_stream write err, assuming hangup: {:?}", err); reset_client_conn = true; @@ -518,10 +551,10 @@ impl SessionInner { .spawn(move || log_if_error("error in shell->client", closure()))?) } - fn write_exit_chunk<W: io::Write>(mut stream: W, status: i32) { + fn write_exit_chunk<W: io::Write>(mut sink: W, status: i32) { let status_buf: [u8; 4] = status.to_le_bytes(); let chunk = Chunk { kind: ChunkKind::ExitStatus, buf: status_buf.as_slice() }; - match chunk.write_to(&mut stream).and_then(|_| stream.flush()) { + match chunk.write_to(&mut sink).and_then(|_| sink.flush()) { Ok(_) => { trace!("wrote exit status chunk"); } @@ -559,9 +592,8 @@ impl SessionInner { client_stream.try_clone().context("creating client->shell client stream")?; let shell_to_client_client_stream = client_stream.try_clone().context("creating shell->client client stream handle")?; - let client_stream_m = Arc::new(Mutex::new(io::BufWriter::new( - client_stream.try_clone().context("wrapping stream in bufwriter")?, - ))); + let output_sink = + io::BufWriter::new(client_stream.try_clone().context("wrapping stream in bufwriter")?); { let _s = span!(Level::INFO, "initial_attach_lock(shell_to_client_ctl)").entered(); @@ -570,7 +602,7 @@ impl SessionInner { .client_connection .send_timeout( ClientConnectionMsg::New(ClientConnection { - sink: Arc::clone(&client_stream_m), + sink: output_sink, size: init_tty_size, stream: shell_to_client_client_stream, }), @@ -600,8 +632,7 @@ impl SessionInner { // Send a steady stream of heartbeats to the client // so that if the connection unexpectedly goes // down, we detect it immediately. - let heartbeat_h = self.spawn_heartbeat( - s, conn_id, &stop, &client_stream_m)?; + let heartbeat_h = self.spawn_heartbeat(s, conn_id, &stop)?; // poll the pty master fd to see if the child // shell has exited. @@ -677,7 +708,6 @@ impl SessionInner { } debug!("joined all threads"); - Ok(()) }).context("outer thread scope")?; @@ -843,7 +873,6 @@ impl SessionInner { scope: &'scope thread::Scope<'scope, '_>, conn_id: usize, stop: &'scope AtomicBool, - client_stream_m: &'scope Arc<Mutex<io::BufWriter<UnixStream>>>, ) -> anyhow::Result<thread::ScopedJoinHandle<'scope, anyhow::Result<()>>> { thread::Builder::new() .name(format!("heartbeat({})", self.name)) @@ -858,20 +887,39 @@ impl SessionInner { } thread::sleep(consts::HEARTBEAT_DURATION); - let chunk = Chunk { kind: ChunkKind::Heartbeat, buf: &[] }; { - let mut s = client_stream_m.lock().unwrap(); - match chunk.write_to(&mut *s).and_then(|_| s.flush()) { - Ok(_) => { - trace!("wrote heartbeat"); - } - Err(e) if e.kind() == io::ErrorKind::BrokenPipe => { - trace!("client hangup: {:?}", e); - return Ok(()); + let shell_to_client_ctl = self.shell_to_client_ctl.lock().unwrap(); + match shell_to_client_ctl + .heartbeat + .send_timeout((), SHELL_TO_CLIENT_CTL_TIMEOUT) + { + // If the channel is disconnected, it means that the shell exited and + // the shell->client process exited cleanly. We should not raise a + // ruckus. + Err(crossbeam_channel::SendTimeoutError::Disconnected(_)) => { + return Ok(()) } Err(e) => { - return Err(e).context("writing heartbeat")?; + return Err(e) + .context("requesting heartbeat from shell->client thread") } + _ => {} + } + let client_present = match shell_to_client_ctl + .heartbeat_ack + .recv_timeout(SHELL_TO_CLIENT_CTL_TIMEOUT) + { + // If the channel is disconnected, it means that the shell exited and + // the shell->client process exited cleanly. We should not raise a + // ruckus. + Err(crossbeam_channel::RecvTimeoutError::Disconnected) => return Ok(()), + Err(e) => return Err(e).context("waiting for heartbeat ack"), + Ok(client_present) => client_present, + }; + if !client_present { + // Bail from the thread to get the rest of the + // client threads to clean themselves up. + return Ok(()); } } } @@ -966,6 +1014,13 @@ pub struct ReaderCtl { /// A control channel for the shell->client thread. Acks the completion of a /// spool resize. pub tty_size_change_ack: crossbeam_channel::Receiver<()>, + + // A control channel telling the shell->client thread to issue + // a heartbeat to check if the client is still listening. + pub heartbeat: crossbeam_channel::Sender<()>, + // True if the client is still listening, false if it has hung up + // on us. + pub heartbeat_ack: crossbeam_channel::Receiver<bool>, } /// Given a buffer, a length after which the data is not valid, a list of