diff --git a/Cargo.lock b/Cargo.lock index 8cf5c69be..7983eac4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4483,6 +4483,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "slab", "slog", "slog-async", "slog-bunyan", diff --git a/Cargo.toml b/Cargo.toml index a3abc64e5..39cf86091 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -150,6 +150,7 @@ serde_derive = "1.0" serde_json = "1.0" serde_test = "1.0.138" serde_with = "3.11.0" +slab = "0.4.9" slog = "2.7" slog-async = "2.8" slog-bunyan = "2.4.0" diff --git a/bin/propolis-cli/src/main.rs b/bin/propolis-cli/src/main.rs index 47e72c847..bfe3f7f52 100644 --- a/bin/propolis-cli/src/main.rs +++ b/bin/propolis-cli/src/main.rs @@ -125,6 +125,10 @@ enum Command { /// Defaults to the most recent 16 KiB of console output (-16384). #[clap(long, short)] byte_offset: Option, + + /// True if the serial console connection should be read-only. + #[clap(long, action)] + readonly: bool, }, /// Migrate instance to new propolis-server @@ -604,9 +608,11 @@ async fn stdin_to_websockets_task( async fn serial( addr: SocketAddr, byte_offset: Option, + readonly: bool, log: Logger, ) -> anyhow::Result<()> { - let mut ws_console = serial_connect(addr, byte_offset, log).await?; + let mut ws_console = + serial_connect(addr, byte_offset, readonly, log).await?; let _raw_guard = RawTermiosGuard::stdio_guard() .with_context(|| anyhow!("failed to set raw mode"))?; @@ -699,6 +705,7 @@ async fn serial( async fn serial_connect( addr: SocketAddr, byte_offset: Option, + readonly: bool, log: Logger, ) -> anyhow::Result { let offset = match byte_offset { @@ -707,7 +714,8 @@ async fn serial_connect( None => WSClientOffset::MostRecent(16384), }; - Ok(InstanceSerialConsoleHelper::new(addr, offset, Some(log)).await?) + Ok(InstanceSerialConsoleHelper::new(addr, offset, readonly, Some(log)) + .await?) } async fn migrate_instance( @@ -914,8 +922,8 @@ async fn main() -> anyhow::Result<()> { } Command::Get => get_instance(&client).await?, Command::State { state } => put_instance(&client, state).await?, - Command::Serial { byte_offset } => { - serial(addr, byte_offset, log).await? + Command::Serial { byte_offset, readonly } => { + serial(addr, byte_offset, readonly, log).await? } Command::Migrate { dst_server, dst_port, dst_uuid, crucible_disks } => { let dst_addr = SocketAddr::new(dst_server, dst_port); diff --git a/bin/propolis-server/Cargo.toml b/bin/propolis-server/Cargo.toml index c748d4dfb..f9cf9b29c 100644 --- a/bin/propolis-server/Cargo.toml +++ b/bin/propolis-server/Cargo.toml @@ -50,6 +50,7 @@ toml.workspace = true serde.workspace = true serde_derive.workspace = true serde_json.workspace = true +slab.workspace = true slog.workspace = true slog-async.workspace = true slog-bunyan.workspace = true diff --git a/bin/propolis-server/src/lib/initializer.rs b/bin/propolis-server/src/lib/initializer.rs index 094e7cc3c..1a502b7f5 100644 --- a/bin/propolis-server/src/lib/initializer.rs +++ b/bin/propolis-server/src/lib/initializer.rs @@ -9,7 +9,7 @@ use std::os::unix::fs::FileTypeExt; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::serial::Serial; +use crate::serial::backend::ConsoleBackend; use crate::spec::{self, Spec, StorageBackend, StorageDevice}; use crate::stats::{ track_network_interface_kstats, track_vcpu_kstats, VirtualDiskProducer, @@ -377,7 +377,7 @@ impl MachineInitializer<'_> { pub fn initialize_uart( &mut self, chipset: &RegisteredChipset, - ) -> Serial { + ) -> Arc { let mut com1 = None; for (name, desc) in self.spec.serial.iter() { if desc.device != spec::SerialPortDevice::Uart { @@ -401,9 +401,8 @@ impl MachineInitializer<'_> { } } - let sink_size = NonZeroUsize::new(64).unwrap(); - let source_size = NonZeroUsize::new(1024).unwrap(); - Serial::new(com1.unwrap(), sink_size, source_size) + const COM1_HISTORY_BYTES: usize = 1024 * 1024; + ConsoleBackend::new(com1.unwrap().clone(), COM1_HISTORY_BYTES) } pub fn initialize_ps2( diff --git a/bin/propolis-server/src/lib/migrate/destination.rs b/bin/propolis-server/src/lib/migrate/destination.rs index 49bb96b1f..7a9eefb49 100644 --- a/bin/propolis-server/src/lib/migrate/destination.rs +++ b/bin/propolis-server/src/lib/migrate/destination.rs @@ -6,7 +6,8 @@ use bitvec::prelude as bv; use futures::{SinkExt, StreamExt}; use propolis::common::{GuestAddr, Lifecycle, PAGE_SIZE}; use propolis::migrate::{ - MigrateCtx, MigrateStateError, Migrator, PayloadOffer, PayloadOffers, + MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOffer, + PayloadOffers, }; use propolis::vmm; use propolis_api_types::instance_spec::SpecKey; @@ -22,10 +23,10 @@ use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::{tungstenite, WebSocketStream}; use uuid::Uuid; -use crate::migrate::codec; use crate::migrate::memx; use crate::migrate::preamble::Preamble; use crate::migrate::probes; +use crate::migrate::{codec, DevicePayload}; use crate::migrate::{ Device, MigrateError, MigratePhase, MigrateRole, MigrationState, PageIter, }; @@ -511,6 +512,21 @@ impl RonV0 { return Err(MigrateError::UnexpectedMessage); } }; + + let com1_payload: DevicePayload = match self.read_msg().await? { + codec::Message::Serialized(encoded) => { + ron::de::from_reader(encoded.as_bytes()) + .map_err(codec::ProtocolError::from)? + } + msg => { + error!( + self.log(), + "device_state: unexpected COM1 history message: {msg:?}" + ); + return Err(MigrateError::UnexpectedMessage); + } + }; + self.read_ok().await?; info!(self.log(), "Devices: {devices:#?}"); @@ -529,6 +545,18 @@ impl RonV0 { })?; self.import_device(&target, &device, &migrate_ctx)?; } + + let com1_data = + &mut ron::Deserializer::from_str(&com1_payload.data) + .map_err(codec::ProtocolError::from)?; + let com1_offer = PayloadOffer { + kind: &com1_payload.kind, + version: com1_payload.version, + payload: Box::new(::erase( + com1_data, + )), + }; + vm_objects.com1().import(com1_offer, &migrate_ctx)?; } self.send_msg(codec::Message::Okay).await @@ -762,24 +790,8 @@ impl RonV0 { .map_err(codec::ProtocolError::from)?, )) .await?; - let com1_history = match self.read_msg().await? { - codec::Message::Serialized(encoded) => encoded, - msg => { - error!(self.log(), "server_state: unexpected message: {msg:?}"); - return Err(MigrateError::UnexpectedMessage); - } - }; - - ensure_ctx - .vm_objects() - .lock_shared() - .await - .com1() - .import(&com1_history) - .await - .map_err(|e| MigrateError::Codec(e.to_string()))?; - self.send_msg(codec::Message::Okay).await + Ok(()) } async fn finish( diff --git a/bin/propolis-server/src/lib/migrate/source.rs b/bin/propolis-server/src/lib/migrate/source.rs index 49328c763..8f8c128ff 100644 --- a/bin/propolis-server/src/lib/migrate/source.rs +++ b/bin/propolis-server/src/lib/migrate/source.rs @@ -6,7 +6,7 @@ use bitvec::prelude::{BitSlice, Lsb0}; use futures::{SinkExt, StreamExt}; use propolis::common::{GuestAddr, GuestData, PAGE_SIZE}; use propolis::migrate::{ - MigrateCtx, MigrateStateError, Migrator, PayloadOutputs, + MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOutputs, }; use propolis::vmm; use propolis_api_types::instance_spec::VersionedInstanceSpec; @@ -32,6 +32,7 @@ use crate::migrate::{ }; use crate::vm::objects::VmObjects; +use crate::vm::services::VmServices; use crate::vm::state_publisher::{ ExternalStateUpdate, MigrationStateUpdate, StatePublisher, }; @@ -133,6 +134,7 @@ pub(crate) trait SourceProtocol { async fn run( self, vm_objects: &VmObjects, + vm_services: &VmServices, publisher: &mut StatePublisher, persistent_state: &mut PersistentState, ) -> Result<(), MigrateError>; @@ -318,6 +320,7 @@ impl SourceProtocol for RonV0 { async fn run( self, vm_objects: &VmObjects, + vm_services: &VmServices, publisher: &mut StatePublisher, persistent_state: &mut PersistentState, ) -> Result<(), MigrateError> { @@ -327,6 +330,7 @@ impl SourceProtocol for RonV0 { conn: self.conn, dirt: self.dirt, vm: vm_objects, + vm_services, state_publisher: publisher, persistent_state, paused: false, @@ -342,6 +346,7 @@ struct RonV0Runner<'vm, T: MigrateConn> { conn: WebSocketStream, dirt: Option>, vm: &'vm VmObjects, + vm_services: &'vm VmServices, state_publisher: &'vm mut StatePublisher, persistent_state: &'vm mut PersistentState, paused: bool, @@ -680,6 +685,7 @@ impl RonV0Runner<'_, T> { async fn device_state(&mut self) -> Result<(), MigrateError> { self.update_state(MigrationState::Device); let mut device_states = vec![]; + let com1_payload; { let objects = self.vm.lock_shared().await; let migrate_ctx = @@ -728,6 +734,14 @@ impl RonV0Runner<'_, T> { } Ok(()) })?; + + let com1_state = objects.com1().export(&migrate_ctx)?; + com1_payload = DevicePayload { + kind: com1_state.kind.to_owned(), + version: com1_state.version, + data: ron::ser::to_string(&com1_state.payload) + .map_err(codec::ProtocolError::from)?, + }; } info!(self.log(), "Device States: {device_states:#?}"); @@ -738,7 +752,14 @@ impl RonV0Runner<'_, T> { )) .await?; + self.send_msg(codec::Message::Serialized( + ron::ser::to_string(&com1_payload) + .map_err(codec::ProtocolError::from)?, + )) + .await?; + self.send_msg(codec::Message::Okay).await?; + self.read_ok().await } @@ -782,15 +803,15 @@ impl RonV0Runner<'_, T> { } _ => return Err(MigrateError::UnexpectedMessage), }; - let com1_history = self - .vm - .lock_shared() - .await - .com1() - .export_history(remote_addr) - .await?; - self.send_msg(codec::Message::Serialized(com1_history)).await?; - self.read_ok().await + + { + let mgr = self.vm_services.serial_mgr.lock().await; + if let Some(mgr) = mgr.as_ref() { + mgr.notify_migration(remote_addr).await; + } + } + + Ok(()) } async fn finish(&mut self) -> Result<(), MigrateError> { diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs new file mode 100644 index 000000000..7ab958f22 --- /dev/null +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -0,0 +1,498 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! A backend that provides external clients with access to a Propolis character +//! device and tracks the history of bytes the guest has written to that device. +//! +//! The [`ConsoleBackend`] type provides an interface to read and write a +//! character device. The backend itself holds a reference to the device and +//! manages a task that reads bytes from the device and issues them to any +//! registered readers. +//! +//! Users of the backend call [`ConsoleBackend::attach_read_only_client`] or +//! [`ConsoleBackend::attach_read_write_client`] to obtain a [`Client`] +//! representing the new connection. Each client has a corresponding [`Reader`] +//! owned by the backend's [`read_task`]; each `Reader` contains a tokio +//! [`SimplexStream`] to which the read task sends bytes written by the guest. +//! Read-write clients own a reference to a [`Writer`] that can write bytes to a +//! sink associated with the backend's character device. +//! +//! Clients may be disconnected in one of two ways: +//! +//! - A client's owner can just drop its `Client` struct. +//! - If the client was configured to use the "close on full channel" read +//! discipline, and the read task is unable to send some bytes to a reader's +//! stream because the channel is full, the client is invalidated. +//! +//! To avoid circular references, clients and their readers don't refer to each +//! other directly. Instead, they share both sides of a `tokio::watch` to which +//! they publish `true` when a connection has been invalidated, regardless of +//! who invalidated it. The receiver end of this channel is available to users +//! through [`Client::get_defunct_rx`] to allow clients' users to learn when +//! their clients have been closed. + +use std::{ + future::Future, + num::NonZeroUsize, + sync::{Arc, Mutex}, + task::{Context, Poll}, +}; + +use propolis::chardev::{ + pollers::{SinkBuffer, SourceBuffer}, + Sink, Source, +}; +use tokio::{ + io::{AsyncWriteExt, SimplexStream, WriteHalf}, + select, + sync::{mpsc, oneshot, watch}, +}; + +use super::history_buffer::{HistoryBuffer, SerialHistoryOffset}; + +/// A client's rights when accessing a character backend. +#[derive(Clone, Copy)] +enum Permissions { + /// The client may both read and write to the backend. + ReadWrite, + + /// The client may only read from the backend. + ReadOnly, +} + +/// Determines what happens when the backend is unable to send a guest byte back +/// to a client because the client's channel is full. +#[derive(Clone, Copy)] +pub(super) enum FullReadChannelDiscipline { + /// The backend should block until it can send to this client. + Block, + + /// The backend should close this client's connection. + Close, +} + +/// An entity that receives bytes written by the guest. +/// +/// Readers are instantiated in [`ConsoleBackend::attach_client`] and passed via +/// channel to the backend's [`read_task`]. +struct Reader { + /// Bytes received from the guest are sent into this stream. + tx: WriteHalf, + + /// Determines what happens if a read fails to accept all of the bytes the + /// task wanted to write. + discipline: FullReadChannelDiscipline, + + /// Set to `true` when this reader is dropped. + defunct_tx: watch::Sender, + + /// Set to `true` when this reader's associated [`Client`] is dropped. + defunct_rx: watch::Receiver, +} + +impl Drop for Reader { + fn drop(&mut self) { + let _ = self.defunct_tx.send(true); + } +} + +/// An entity that can attempt to write bytes to the guest. +struct Writer { + /// The backend to which this writer directs its writes. + backend: Arc, +} + +impl Writer { + /// Sends the bytes in `buf` to the sink associated with this writer's + /// backend. + async fn write(&mut self, buf: &[u8]) -> Result { + assert!(!buf.is_empty()); + + Ok(self + .backend + .sink_buffer + .write(buf, self.backend.sink.as_ref()) + .await + .expect("chardev sink writes always succeed")) + } +} + +/// A client of this backend. +pub(super) struct Client { + /// The client's associated [`Writer`], if it was instantiated as a + /// read-write client. + writer: Option, + + /// The client writes `true` to this channel when it is dropped. + defunct_tx: watch::Sender, + + /// The read task writes `true` to this channel if this client's associated + /// [`Reader`] is closed. + defunct_rx: watch::Receiver, +} + +impl Client { + pub(super) fn can_write(&self) -> bool { + self.writer.is_some() + } + + pub(super) fn get_defunct_rx(&self) -> watch::Receiver { + self.defunct_rx.clone() + } + + /// Attempts to write the bytes in `buf` to the backend. + /// + /// The backend may buffer some of the written bytes before sending them to + /// its device, so when this function returns, it is not guaranteed that all + /// of the written bytes have actually reached the guest. + /// + /// # Return value + /// + /// - `Ok(bytes)` if the write succeeded; `bytes` is the number of bytes + /// that were written. + /// - `Err(std::io::Error::PermissionDenied)` if this client does not have + /// write access to the device. + /// - `Err(std::io::Error::ConnectionAborted)` if this client lost its + /// ability to write to the device because it stopped servicing reads, + /// i.e., its read channel was full and it uses the close-on-full-channel + /// read discipline. + /// + /// # Cancel safety + /// + /// The future returned by this function is cancel-safe. If it is dropped, + /// it is guaranteed that no bytes were written to the backend or its + /// device. + pub(super) async fn write( + &mut self, + buf: &[u8], + ) -> Result { + if buf.is_empty() { + return Ok(0); + } + + let Some(writer) = self.writer.as_mut() else { + return Err(std::io::Error::from( + std::io::ErrorKind::PermissionDenied, + )); + }; + + if *self.defunct_rx.borrow_and_update() { + return Err(std::io::Error::from( + std::io::ErrorKind::ConnectionAborted, + )); + } + + writer.write(buf).await + } +} + +impl Drop for Client { + fn drop(&mut self) { + let _ = self.defunct_tx.send(true); + } +} + +/// Character backend state that's protected by a lock. +struct Inner { + /// A history of the bytes read from this backend's device. + buffer: HistoryBuffer, +} + +impl Inner { + fn new(history_size: usize) -> Self { + Self { buffer: HistoryBuffer::new(history_size) } + } +} + +/// A backend for a Propolis serial console that allows multiple clients to +/// access a single serial device. +pub struct ConsoleBackend { + inner: Arc>, + + /// The character [`Sink`] that should receive writes to this backend. + /// Writes should not access the sink directly; instead, they should be + /// directed to [`Self::sink_buffer`]. This reference is needed because + /// [`SinkBuffer`]'s current API requires a sink to be passed into each + /// attempt to write (buffers don't own references to their sinks). + sink: Arc, + + /// The buffer that sits in front of this backend's sink. Writes to the + /// backend should be directed at the buffer, not at [`Self::sink`]. + sink_buffer: Arc, + + /// Receives the [`Reader`]s generated when new clients connect to this + /// backend. + reader_tx: mpsc::UnboundedSender, + + /// A channel used to tell the backend's reader task that the backend has + /// been closed. + done_tx: oneshot::Sender<()>, +} + +impl ConsoleBackend { + pub fn new( + device: Arc, + history_bytes: usize, + ) -> Arc { + const SINK_BUFFER_BYTES: usize = 64; + + let sink_buffer = + SinkBuffer::new(NonZeroUsize::new(SINK_BUFFER_BYTES).unwrap()); + sink_buffer.attach(device.as_ref()); + + let sink = device.clone(); + let source = device.clone(); + + let (reader_tx, reader_rx) = mpsc::unbounded_channel(); + let (done_tx, done_rx) = oneshot::channel(); + + let this = Arc::new(Self { + inner: Arc::new(Mutex::new(Inner::new(history_bytes))), + sink, + sink_buffer, + reader_tx, + done_tx, + }); + + let inner = this.inner.clone(); + tokio::spawn(read_task(inner, source, reader_rx, done_rx)); + + this + } + + /// Attaches a new read-only client to this backend. Incoming bytes from the + /// guest are sent to `read_tx`. `discipline` specifies what happens if the + /// channel is full when new bytes are available to read. + /// + /// The caller may disconnect its session by dropping the returned + /// [`Client`]. + pub(super) fn attach_read_only_client( + self: &Arc, + read_tx: WriteHalf, + discipline: FullReadChannelDiscipline, + ) -> Client { + self.attach_client(read_tx, Permissions::ReadOnly, discipline) + } + + /// Attaches a new read-write client to this backend. Incoming bytes from + /// the guest are sent to `read_tx`. `discipline` specifies what happens if + /// the channel is full when new bytes are available to read. + /// + /// The caller may disconnect its session by dropping the returned + /// [`Client`]. + pub(super) fn attach_read_write_client( + self: &Arc, + read_tx: WriteHalf, + discipline: FullReadChannelDiscipline, + ) -> Client { + self.attach_client(read_tx, Permissions::ReadWrite, discipline) + } + + /// Attaches a new client to this backend, returning a [`Client`] that + /// represents the caller's connection to the backend. + fn attach_client( + self: &Arc, + read_tx: WriteHalf, + permissions: Permissions, + discipline: FullReadChannelDiscipline, + ) -> Client { + let (defunct_tx, defunct_rx) = watch::channel(false); + let writer = match permissions { + Permissions::ReadWrite => Some(Writer { backend: self.clone() }), + Permissions::ReadOnly => None, + }; + + let reader = Reader { + tx: read_tx, + discipline, + defunct_tx: defunct_tx.clone(), + defunct_rx: defunct_rx.clone(), + }; + + // Unwrapping is safe here because `read_task` is only allowed to exit + // after the backend signals `done_tx`, which only happens when the + // backend is dropped. + self.reader_tx.send(reader).unwrap(); + + Client { writer, defunct_tx, defunct_rx } + } + + /// Returns the contents of this backend's history buffer. See + /// [`HistoryBuffer::contents_vec`]. + pub fn history_vec( + &self, + byte_offset: SerialHistoryOffset, + max_bytes: Option, + ) -> Result<(Vec, usize), super::history_buffer::Error> { + let inner = self.inner.lock().unwrap(); + inner.buffer.contents_vec(byte_offset, max_bytes) + } + + /// Returns the number of bytes that have ever been sent to this backend's + /// history buffer. + pub fn bytes_since_start(&self) -> usize { + self.inner.lock().unwrap().buffer.bytes_from_start() + } +} + +impl Drop for ConsoleBackend { + fn drop(&mut self) { + let (tx, _rx) = oneshot::channel(); + let done_tx = std::mem::replace(&mut self.done_tx, tx); + let _ = done_tx.send(()); + } +} + +mod migrate { + use propolis::migrate::{ + MigrateCtx, MigrateSingle, MigrateStateError, PayloadOffer, + }; + + use crate::serial::history_buffer::migrate::HistoryBufferContentsV1; + + use super::ConsoleBackend; + + impl MigrateSingle for ConsoleBackend { + fn export( + &self, + _ctx: &MigrateCtx, + ) -> Result + { + Ok(self.inner.lock().unwrap().buffer.export().into()) + } + + fn import( + &self, + mut offer: PayloadOffer, + _ctx: &MigrateCtx, + ) -> Result<(), MigrateStateError> { + let contents: HistoryBufferContentsV1 = offer.parse()?; + self.inner.lock().unwrap().buffer.import(contents); + Ok(()) + } + } +} + +/// Reads bytes from the supplied `source` and dispatches them to the clients in +/// `inner`. Each backend is expected to spin up one task that runs this +/// function. +async fn read_task( + inner: Arc>, + source: Arc, + mut reader_rx: mpsc::UnboundedReceiver, + mut done_rx: oneshot::Receiver<()>, +) { + let buf = SourceBuffer::new(propolis::chardev::pollers::Params { + poll_interval: std::time::Duration::from_millis(10), + poll_miss_thresh: 5, + buf_size: NonZeroUsize::new(super::SERIAL_READ_BUFFER_SIZE).unwrap(), + }); + buf.attach(source.as_ref()); + + enum Event { + NewReader(Reader), + BytesRead(usize), + } + + let mut readers = vec![]; + let mut bytes = vec![0; super::SERIAL_READ_BUFFER_SIZE]; + loop { + let event = select! { + biased; + + _ = &mut done_rx => { + return; + } + + new_reader = reader_rx.recv() => { + let Some(reader) = new_reader else { + return; + }; + + Event::NewReader(reader) + } + + bytes_read = buf.read(bytes.as_mut_slice(), source.as_ref()) => { + Event::BytesRead( + bytes_read.expect("SourceBuffer reads are infallible") + ) + } + }; + + // Returns `true` if it was possible to send the entirety of `to_send` + // to `reader` without violating the reader's full-channel discipline. + async fn send_ok(reader: &mut Reader, to_send: &[u8]) -> bool { + match reader.discipline { + // Reads and writes to simplex streams (unlike channels) do not + // resolve to errors if the other half of the stream is dropped + // while the read or write is outstanding; instead, the future + // stays pending forever. + // + // To handle cases where the reader's client handle was dropped + // mid-read, select over the attempt to write and the "defunct" + // watcher and retire the client if the watcher fires first. + FullReadChannelDiscipline::Block => { + select! { + res = reader.tx.write_all(to_send) => { + res.is_ok() + } + + _ = reader.defunct_rx.changed() => { + false + } + } + } + // In the close-on-full-channel case it suffices to poll the + // future exactly once and see if this manages to write all of + // the data. No selection is needed here: if `write` returns + // `Poll::Pending`, `poll_once` will return an error, + // irrespective of the reason the write didn't complete. + FullReadChannelDiscipline::Close => { + matches!( + poll_once(reader.tx.write(to_send)), + Some(Ok(len)) if len == to_send.len() + ) + } + } + } + + match event { + Event::NewReader(r) => readers.push(r), + Event::BytesRead(bytes_read) => { + let to_send = &bytes[0..bytes_read]; + + // Send the bytes to each reader, dropping readers who fail to + // accept all the bytes on offer. + // + // It would be nice to use `Vec::retain_mut` here, but async + // closures aren't quite stable yet, so hand-roll a while loop + // instead. + let mut idx = 0; + while idx < readers.len() { + let ok = send_ok(&mut readers[idx], to_send).await; + if ok { + idx += 1; + } else { + readers.swap_remove(idx); + } + } + + inner.lock().unwrap().buffer.consume(to_send); + } + } + } +} + +/// A helper function to poll a future `f` exactly once, returning its output +/// `Some(R)` if it is immediately ready and `None` otherwise. +fn poll_once(f: impl Future) -> Option { + tokio::pin!(f); + + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + match f.as_mut().poll(&mut cx) { + Poll::Ready(result) => Some(result), + Poll::Pending => None, + } +} diff --git a/bin/propolis-server/src/lib/serial/history_buffer.rs b/bin/propolis-server/src/lib/serial/history_buffer.rs index f688e71ed..0055358d1 100644 --- a/bin/propolis-server/src/lib/serial/history_buffer.rs +++ b/bin/propolis-server/src/lib/serial/history_buffer.rs @@ -7,7 +7,6 @@ use dropshot::HttpError; use propolis_api_types as api; -use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::convert::TryFrom; @@ -29,7 +28,7 @@ const DEFAULT_MAX_LENGTH: isize = 16 * 1024; /// An abstraction for storing the contents of the instance's serial console /// output, intended for retrieval by the web console or other monitoring or /// troubleshooting tools. -#[derive(Deserialize, Serialize, Clone)] +#[derive(Clone)] pub(crate) struct HistoryBuffer { beginning: Vec, rolling: VecDeque, @@ -54,10 +53,12 @@ impl TryFrom<&api::InstanceSerialConsoleStreamRequest> for SerialHistoryOffset { api::InstanceSerialConsoleStreamRequest { from_start: Some(offset), most_recent: None, + .. } => Ok(SerialHistoryOffset::FromStart(*offset as usize)), api::InstanceSerialConsoleStreamRequest { from_start: None, most_recent: Some(offset), + .. } => Ok(SerialHistoryOffset::MostRecent(*offset as usize)), _ => Err(()), } @@ -223,6 +224,48 @@ impl HistoryBuffer { pub fn bytes_from_start(&self) -> usize { self.total_bytes } + + pub(super) fn export(&self) -> migrate::HistoryBufferContentsV1 { + let slices = self.rolling.as_slices(); + let mut most_recent = Vec::with_capacity(self.rolling.len()); + most_recent.extend(slices.0); + most_recent.extend(slices.1); + migrate::HistoryBufferContentsV1 { + beginning: self.beginning.clone(), + most_recent, + total_bytes: self.total_bytes, + buffer_size: self.buffer_size, + } + } + + pub(super) fn import( + &mut self, + contents: migrate::HistoryBufferContentsV1, + ) { + self.beginning = contents.beginning; + self.rolling.clear(); + self.rolling.extend(contents.most_recent.iter()); + self.total_bytes = contents.total_bytes; + self.buffer_size = contents.buffer_size; + } +} + +pub(crate) mod migrate { + use serde::{Deserialize, Serialize}; + + #[derive(Deserialize, Serialize, Clone)] + pub(crate) struct HistoryBufferContentsV1 { + pub(super) beginning: Vec, + pub(super) most_recent: Vec, + pub(super) total_bytes: usize, + pub(super) buffer_size: usize, + } + + impl propolis::migrate::Schema<'_> for HistoryBufferContentsV1 { + fn id() -> propolis::migrate::SchemaId { + ("serial-console-backend", 1) + } + } } #[cfg(test)] diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index 82062af73..1f2f881da 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -2,400 +2,618 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Routines to expose a connection to an instance's serial port. - -use crate::migrate::MigrateError; - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::num::NonZeroUsize; -use std::ops::Range; -use std::sync::Arc; -use std::time::Duration; - -use crate::serial::history_buffer::{HistoryBuffer, SerialHistoryOffset}; -use futures::future::Fuse; -use futures::stream::SplitSink; -use futures::{FutureExt, SinkExt, StreamExt}; -use propolis::chardev::{pollers, Sink, Source}; +//! Defines the [`SerialConsoleManager`], which manages client websocket +//! connections to one of an instance's serial ports. +//! +//! Each `SerialConsoleManager` brokers connections to one serial device. Each +//! websocket connection to the device creates a [`ClientTask`] tracked by the +//! manager. This task largely plumbs incoming bytes from the websocket into the +//! backend and vice-versa, but it can also be used to send control messages to +//! clients, e.g. to notify them that an instance is migrating. +//! +//! The connection manager implements the connection policies described in RFD +//! 491: +//! +//! - A single serial device can have only one read-write client at a time (but +//! can have multiple read-only clients). +//! - The connection manager configures its connections to the backend so that +//! - The backend will block when sending bytes to read-write clients that +//! can't receive them immediately, but +//! - The backend will disconnect read-only clients who can't immediately +//! receive new bytes. + +use std::{ + collections::VecDeque, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use backend::ConsoleBackend; +use dropshot::WebsocketConnectionRaw; +use futures::{SinkExt, StreamExt}; use propolis_api_types::InstanceSerialConsoleControlMessage; -use slog::{info, warn, Logger}; -use thiserror::Error; -use tokio::sync::{mpsc, oneshot, Mutex, RwLock as AsyncRwLock}; -use tokio::task::JoinHandle; -use tokio_tungstenite::tungstenite::protocol::{ - frame::coding::CloseCode, CloseFrame, +use slab::Slab; +use slog::{info, warn}; +use tokio::{ + io::{AsyncReadExt, ReadHalf, SimplexStream}, + select, + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_tungstenite::{ + tungstenite::{ + protocol::{frame::coding::CloseCode, CloseFrame}, + Message, + }, + WebSocketStream, }; -use tokio_tungstenite::tungstenite::Message; -use tokio_tungstenite::{tungstenite, WebSocketStream}; +pub(crate) mod backend; pub(crate) mod history_buffer; #[usdt::provider(provider = "propolis")] mod probes { - fn serial_close_recv() {} - fn serial_new_ws() {} - fn serial_uart_write(n: usize) {} - fn serial_uart_out() {} - fn serial_uart_read(n: usize) {} - fn serial_inject_uart() {} - fn serial_ws_recv() {} + fn serial_task_done() {} + fn serial_task_loop(read_from_ws: bool, has_bytes_to_write: bool) {} + fn serial_task_backend_read(len: usize) {} + fn serial_task_backend_write(len: usize) {} + fn serial_task_console_disconnect() {} + fn serial_task_ws_recv(len: usize) {} + fn serial_task_ws_error() {} + fn serial_task_ws_disconnect() {} fn serial_buffer_size(n: usize) {} } -/// Errors which may occur during the course of a serial connection. -#[derive(Error, Debug)] -pub enum SerialTaskError { - #[error("Cannot upgrade HTTP request to WebSockets: {0}")] - Upgrade(#[from] hyper::Error), +/// The size, in bytes, of the intermediate buffers used to store bytes as they +/// move from the guest character device to the individual reader tasks. +const SERIAL_READ_BUFFER_SIZE: usize = 512; - #[error("WebSocket Error: {0}")] - WebSocket(#[from] tungstenite::Error), +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +struct ClientId(usize); - #[error("IO error: {0}")] - Io(#[from] std::io::Error), +/// Specifies whether a client should be a read-write or read-only client. +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum ClientKind { + ReadWrite, + ReadOnly, +} - #[error("Mismatched websocket streams while closing")] - MismatchedStreams, +/// The possible events that can be sent to a serial task's control channel. +enum ControlEvent { + /// The task has been registered into the task set and can start its main + /// loop. + Start(ClientId, backend::Client), + /// Another part of the server has dispatched a console session control + /// message to this task. + Message(InstanceSerialConsoleControlMessage), +} - #[error("Error while waiting for notification: {0}")] - OneshotRecv(#[from] oneshot::error::RecvError), +impl std::fmt::Debug for ControlEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Start(arg0, _arg1) => { + f.debug_tuple("Start").field(arg0).finish() + } + Self::Message(arg0) => { + f.debug_tuple("Message").field(arg0).finish() + } + } + } +} + +/// Tracks an individual client of this console and its associated tokio task. +struct ClientTask { + /// The join handle for this client's task. + hdl: JoinHandle<()>, + + /// Receives server-level commands and notifications about this connection. + control_tx: mpsc::Sender, + + /// Triggered when the serial console manager shuts down. + done_tx: oneshot::Sender<()>, +} + +/// The collection of [`ClientTask`]s belonging to a console manager. +#[derive(Default)] +struct ClientTasks { + /// The set of registered tasks. + tasks: Slab, - #[error("JSON marshalling error while processing control message: {0}")] - Json(#[from] serde_json::Error), + /// The ID of the current read-write client, if there is one. + rw_client_id: Option, } -pub enum SerialTaskControlMessage { - Stopping, - Migration { destination: SocketAddr, from_start: u64 }, +impl ClientTasks { + fn add(&mut self, task: ClientTask) -> ClientId { + ClientId(self.tasks.insert(task)) + } + + /// Ensures the task with ID `id` is removed from this task set. + fn ensure_removed_by_id(&mut self, id: ClientId) { + // `slab` will panic if asked to remove an ID that isn't present in the + // collection. This can happen if a read-write task is evicted by a new + // read-write task, and then the old task exits. Handling this case here + // allows new client connections to take ownership of a retired + // read-write task and await its completion while holding no locks. + if self.tasks.contains(id.0) { + self.tasks.remove(id.0); + match self.rw_client_id { + Some(existing_id) if existing_id == id => { + self.rw_client_id = None; + } + _ => {} + } + } + } + + /// If there is a read-write client in the task collection, removes it (from + /// both the read-write client position and the task table) and returns it. + fn remove_rw_client(&mut self) -> Option { + if let Some(id) = self.rw_client_id.take() { + Some(self.tasks.remove(id.0)) + } else { + None + } + } } -pub struct SerialTask { - /// Handle to attached serial session - pub task: JoinHandle<()>, - /// Channel used to signal the task to terminate gracefully or notify - /// clients of a migration - pub control_ch: mpsc::Sender, - /// Channel used to send new client connections to the streaming task - pub websocks_ch: - mpsc::Sender>, +/// Manages individual websocket client connections to a single serial device. +pub struct SerialConsoleManager { + log: slog::Logger, + backend: Arc, + client_tasks: Arc>, } -pub async fn instance_serial_task( - mut websocks_recv: mpsc::Receiver< - WebSocketStream, - >, - mut control_recv: mpsc::Receiver, - serial: Arc>, - log: Logger, -) -> Result<(), SerialTaskError> { - info!(log, "Entered serial task"); - let mut output = [0u8; 1024]; - let mut cur_output: Option> = None; - let mut cur_input: Option<(Vec, usize)> = None; - - let mut ws_sinks: HashMap< - usize, - SplitSink, Message>, - > = HashMap::new(); - let mut ws_streams: HashMap< - usize, - futures::stream::SplitStream< - WebSocketStream, - >, - > = HashMap::new(); - - let (send_ch, mut recv_ch) = mpsc::channel(4); - - let mut next_stream_id = 0usize; +impl SerialConsoleManager { + pub fn new(log: slog::Logger, backend: Arc) -> Self { + Self { + log, + backend, + client_tasks: Arc::new(Mutex::new(ClientTasks::default())), + } + } - loop { - let (uart_read, ws_send) = - if ws_sinks.is_empty() || cur_output.is_none() { - (serial.read_source(&mut output).fuse(), Fuse::terminated()) + /// Directs all client tasks to stop and waits for them all to finish. + pub async fn stop(self) { + let tasks = { + let mut guard = self.client_tasks.lock().unwrap(); + std::mem::take(&mut guard.tasks) + }; + + let mut tasks: Vec = + tasks.into_iter().map(|e| e.1).collect(); + + for task in &mut tasks { + let (tx, _rx) = oneshot::channel(); + let done_tx = std::mem::replace(&mut task.done_tx, tx); + let _ = done_tx.send(()); + } + + futures::future::join_all(tasks.into_iter().map(|task| task.hdl)).await; + } + + /// Creates a new task that connects a websocket stream to this manager's + /// serial console. + pub async fn connect( + &self, + ws: WebSocketStream, + kind: ClientKind, + ) { + let (console_rx, console_tx) = + tokio::io::simplex(SERIAL_READ_BUFFER_SIZE); + let (control_tx, control_rx) = mpsc::channel(1); + let (task_done_tx, task_done_rx) = oneshot::channel(); + let discipline = match kind { + ClientKind::ReadWrite => backend::FullReadChannelDiscipline::Block, + ClientKind::ReadOnly => backend::FullReadChannelDiscipline::Close, + }; + + let prev_rw_task; + let new_id; + { + let mut client_tasks = self.client_tasks.lock().unwrap(); + + // There can only ever be one read-write client at a time. If one + // already exists, remove it from the task tracker. It will be + // replaced with this registrant's ID before the lock is dropped. + prev_rw_task = if kind == ClientKind::ReadWrite { + client_tasks.remove_rw_client() } else { - let range = cur_output.clone().unwrap(); - ( - Fuse::terminated(), - if !ws_sinks.is_empty() { - futures::stream::iter( - ws_sinks.iter_mut().zip(std::iter::repeat( - Vec::from(&output[range]), - )), - ) - .for_each_concurrent(4, |((_i, ws), bin)| { - ws.send(Message::binary(bin)).map(|_| ()) - }) - .fuse() - } else { - Fuse::terminated() + None + }; + + let ctx = SerialTaskContext { + log: self.log.clone(), + ws, + console_rx, + control_rx, + done_rx: task_done_rx, + client_tasks: self.client_tasks.clone(), + }; + + let task = ClientTask { + hdl: tokio::spawn(serial_task(ctx)), + control_tx: control_tx.clone(), + done_tx: task_done_tx, + }; + + new_id = client_tasks.add(task); + if kind == ClientKind::ReadWrite { + assert!(client_tasks.rw_client_id.is_none()); + client_tasks.rw_client_id = Some(new_id); + } + }; + + // Register with the backend to get a client handle to pass back to the + // new client task. + // + // Note that if the read-full discipline is Close and the guest is very + // busily writing data, it is possible for the backend to decide the + // channel is unresponsive before the client task is ever told to enter + // its main loop. While not ideal, this is legal because it is also + // possible for the channel to have filled between the time the task + // processed its "start" message and the time it actually began to poll + // its read channel for data. In either case the client will be + // disconnected immediately and not get to read anything. + let backend_hdl = match kind { + ClientKind::ReadWrite => { + self.backend.attach_read_write_client(console_tx, discipline) + } + ClientKind::ReadOnly => { + self.backend.attach_read_only_client(console_tx, discipline) + } + }; + + // If this task displaced a previous read-write task, make sure that + // task has exited before allowing the new one to enter its main loop + // to avoid accidentally interleaving multiple tasks' writes. + if let Some(task) = prev_rw_task { + let _ = task.done_tx.send(()); + let _ = task.hdl.await; + } + + let _ = control_tx.send(ControlEvent::Start(new_id, backend_hdl)).await; + } + + /// Notifies all active clients that the instance is migrating to a new + /// host. + /// + /// This function reads state from the console backend and relays it to + /// clients. The caller should ensure that the VM is paused before calling + /// this function so that backend's state doesn't change after this function + /// captures it. + pub async fn notify_migration(&self, destination: SocketAddr) { + let from_start = self.backend.bytes_since_start() as u64; + let clients: Vec<_> = { + let client_tasks = self.client_tasks.lock().unwrap(); + client_tasks + .tasks + .iter() + .map(|(_, client)| client.control_tx.clone()) + .collect() + }; + + for client in clients { + let _ = client + .send(ControlEvent::Message( + InstanceSerialConsoleControlMessage::Migrating { + destination, + from_start, }, + )) + .await; + } + } +} + +/// Context passed to every serial console task. +struct SerialTaskContext { + /// The logger this task should use. + log: slog::Logger, + + /// The websocket connection this task uses to communicate with its client. + ws: WebSocketStream, + + /// Receives output bytes from the guest. + console_rx: ReadHalf, + + /// Receives commands and notifications from the console manager. + control_rx: mpsc::Receiver, + + /// Signaled to tell a task to shut down. + done_rx: oneshot::Receiver<()>, + + /// A reference to the manager's client task map, used to deregister this + /// task on disconnection. + client_tasks: Arc>, +} + +async fn serial_task( + SerialTaskContext { + log, + ws, + mut console_rx, + mut control_rx, + mut done_rx, + client_tasks, + }: SerialTaskContext, +) { + enum Event { + Done, + ReadFromBackend(usize), + ConsoleDisconnected, + WroteToBackend(Result), + Control(ControlEvent), + WebsocketMessage(Message), + WebsocketError(tokio_tungstenite::tungstenite::Error), + WebsocketDisconnected, + } + + let (client_id, mut backend_hdl) = match control_rx.recv().await { + Some(ControlEvent::Start(id, hdl)) => (id, hdl), + Some(e) => { + panic!("serial task's first message should be Start but was {e:?}") + } + None => panic!("serial task's control channel closed unexpectedly"), + }; + + info!( + log, + "serial console task started"; + "client_id" => client_id.0, + ); + + let mut client_defunct = backend_hdl.get_defunct_rx(); + let mut read_from_guest = vec![0; SERIAL_READ_BUFFER_SIZE]; + let mut remaining_to_send = VecDeque::new(); + let (mut sink, mut stream) = ws.split(); + let mut close_reason: Option<&'static str> = None; + loop { + use futures::future::Either; + + // If this is a read-write client that has read some bytes from the + // websocket peer, create a future that will attempt to write those + // bytes to the console backend, and hold off on reading more data from + // the peer until those bytes are sent. + // + // Otherwise, create an always-pending "write to backend" future and + // accept another message from the websocket. + // + // Read from the websocket even if the client is read-only to avoid + // putting backpressure on the remote peer (and possibly causing it to + // hang). This has the added advantage that if the peer disconnects, the + // read-from-peer future will yield an error, allowing this task to + // clean itself up without having to wait for another occasion to send a + // message to the peer. + let (read_from_ws, backend_write_fut) = + if backend_hdl.can_write() && !remaining_to_send.is_empty() { + remaining_to_send.make_contiguous(); + ( + false, + Either::Left( + backend_hdl.write(remaining_to_send.as_slices().0), + ), ) + } else { + (true, Either::Right(futures::future::pending())) }; - let (ws_recv, uart_write) = match &cur_input { - None => ( - if !ws_streams.is_empty() { - futures::stream::iter(ws_streams.iter_mut()) - .for_each_concurrent(4, |(i, ws)| { - ws.next() - .then(|msg| send_ch.send((*i, msg))) - .map(|_| ()) - }) - .fuse() - } else { - Fuse::terminated() - }, - Fuse::terminated(), - ), - Some((data, consumed)) => ( - Fuse::terminated(), - serial.write_sink(&data[*consumed..]).fuse(), - ), + let ws_fut = if read_from_ws { + Either::Left(stream.next()) + } else { + Either::Right(futures::future::pending()) }; - let input_recv_ch_fut = recv_ch.recv().fuse(); - let new_ws_recv = websocks_recv.recv().fuse(); - let control_recv_fut = control_recv.recv().fuse(); - - tokio::select! { - // Poll in the order written + probes::serial_task_loop!(|| ( + read_from_ws, + !remaining_to_send.is_empty() + )); + + let event = select! { + // The priority of these branches is important: + // + // 1. If the console manager asks to stop this task, it should stop + // immediately without doing any more work (the VM is going + // away). + // 2. Similarly, if the backend's read task marked this client as + // defunct, the task should exit right away. + // 3. New bytes written by the guest need to be processed before any + // other requests: if a guest outputs a byte while a read-write + // client is attached, the relevant vCPU will be blocked until + // the client processes the byte. biased; - // It's important we always poll the close channel first - // so that a constant stream of incoming/outgoing messages - // don't cause us to ignore it - message = control_recv_fut => { - probes::serial_close_recv!(|| {}); - match message { - Some(SerialTaskControlMessage::Stopping) | None => { - // Gracefully close the connections to any clients - for (i, ws0) in ws_sinks.into_iter() { - let ws1 = ws_streams.remove(&i).ok_or(SerialTaskError::MismatchedStreams)?; - let mut ws = ws0.reunite(ws1).map_err(|_| SerialTaskError::MismatchedStreams)?; - let _ = ws.close(Some(CloseFrame { - code: CloseCode::Away, - reason: "VM stopped".into(), - })).await; - } - } - Some(SerialTaskControlMessage::Migration { destination, from_start }) => { - let mut failures = 0; - for sink in ws_sinks.values_mut() { - if sink.send(Message::Text(serde_json::to_string( - &InstanceSerialConsoleControlMessage::Migrating { - destination, - from_start, - } - )?)).await.is_err() { - failures += 1; - } - } - if failures > 0 { - warn!(log, "Failed to send migration info to {} connected clients.", failures); - } - } + _ = &mut done_rx => { + Event::Done + } + + _ = client_defunct.changed() => { + Event::ConsoleDisconnected + } + + res = console_rx.read(read_from_guest.as_mut_slice()) => { + match res { + Ok(bytes_read) => Event::ReadFromBackend(bytes_read), + Err(_) => Event::ConsoleDisconnected, } - info!(log, "Terminating serial task"); - break; } - new_ws = new_ws_recv => { - probes::serial_new_ws!(|| {}); - if let Some(ws) = new_ws { - let (ws_sink, ws_stream) = ws.split(); - ws_sinks.insert(next_stream_id, ws_sink); - ws_streams.insert(next_stream_id, ws_stream); - next_stream_id += 1; + control = control_rx.recv() => { + Event::Control(control.expect( + "serial control channel should outlive its task" + )) + } + + res = backend_write_fut => { + Event::WroteToBackend(res) + } + + ws = ws_fut => { + match ws { + None => Event::WebsocketDisconnected, + Some(Ok(msg)) => Event::WebsocketMessage(msg), + Some(Err(err)) => Event::WebsocketError(err), } } + }; - // Write bytes into the UART from the WS - written = uart_write => { - probes::serial_uart_write!(|| { written.unwrap_or(0) }); - match written { - Some(0) | None => { + match event { + Event::Done => { + probes::serial_task_done!(|| ()); + close_reason = Some("VM stopped"); + break; + } + Event::ReadFromBackend(len) => { + probes::serial_task_backend_read!(|| (len)); + + // In general it's OK to wait for this byte to be sent, since + // read-write clients are allowed to block the backend and + // read-only clients will be disconnected if they don't process + // bytes in a timely manner. That said, even read-write clients + // need to monitor `done_rx` here; otherwise a badly-behaved + // remote peer can prevent a VM from stopping. + let res = select! { + biased; + + _ = &mut done_rx => { + probes::serial_task_done!(|| ()); + close_reason = Some("VM stopped"); break; } - Some(n) => { - let (data, consumed) = cur_input.as_mut().unwrap(); - *consumed += n; - if *consumed == data.len() { - cur_input = None; - } - } + + res = sink.send( + Message::binary(read_from_guest[..len].to_vec()) + ) => { res } + }; + + if let Err(e) = res { + info!( + log, "failed to write to a console client"; + "client_id" => client_id.0, + "error" => ?e + ); + close_reason = Some("sending bytes to the client failed"); + break; } } - - // Transmit bytes from the UART through the WS - _ = ws_send => { - probes::serial_uart_out!(|| {}); - cur_output = None; + Event::ConsoleDisconnected => { + probes::serial_task_console_disconnect!(|| ()); + info!( + log, "console backend closed its client channel"; + "client_id" => client_id.0 + ); + break; } + Event::Control(control) => { + let control = match control { + ControlEvent::Start(..) => { + panic!("received a start message after starting"); + } + ControlEvent::Message(msg) => msg, + }; + + // As above, don't let a peer that's not processing control + // messages prevent the VM from stopping. + select! { + biased; - // Read bytes from the UART to be transmitted out the WS - nread = uart_read => { - // N.B. Putting this probe inside the match arms below causes - // the `break` arm to be taken unexpectedly. See - // propolis#292 for details. - probes::serial_uart_read!(|| { nread.unwrap_or(0) }); - match nread { - Some(0) | None => { + _ = &mut done_rx => { + probes::serial_task_done!(|| ()); + close_reason = Some("VM stopped"); break; } - Some(n) => { - cur_output = Some(0..n) - } + + _ = sink + .send(Message::Text( + serde_json::to_string(&control).expect( + "control messages can always serialize into JSON", + ), + )) => {} } } + Event::WroteToBackend(result) => { + let written = match result { + Ok(n) => n, + Err(e) => { + let reason = if e.kind() + == std::io::ErrorKind::ConnectionAborted + { + "read-write console connection closed by backend" + } else { + "error writing to console backend" + }; + + warn!( + log, + "dropping read-write console client"; + "client_id" => client_id.0, + "error" => ?e, + "reason" => reason + ); + + close_reason = Some(reason); + break; + } + }; - // Receive bytes from the intermediate channel to be injected into - // the UART. This needs to be checked before `ws_recv` so that - // "close" messages can be processed and their indicated - // sinks/streams removed before they are polled again. - pair = input_recv_ch_fut => { - probes::serial_inject_uart!(|| {}); - if let Some((i, msg)) = pair { - match msg { - Some(Ok(Message::Binary(input))) => { - cur_input = Some((input, 0)); - } - Some(Ok(Message::Close(..))) | None => { - info!(log, "Removing closed serial connection {}.", i); - let sink = ws_sinks.remove(&i).ok_or(SerialTaskError::MismatchedStreams)?; - let stream = ws_streams.remove(&i).ok_or(SerialTaskError::MismatchedStreams)?; - if let Err(e) = sink.reunite(stream).map_err(|_| SerialTaskError::MismatchedStreams)?.close(None).await { - warn!(log, "Failed while closing stream {}: {}", i, e); - } - }, - _ => continue, + probes::serial_task_backend_write!(|| written); + drop(remaining_to_send.drain(..written)); + } + Event::WebsocketMessage(msg) => { + assert!( + remaining_to_send.is_empty(), + "should only read from the socket when the buffer is empty" + ); + if let Message::Binary(bytes) = msg { + probes::serial_task_ws_recv!(|| (bytes.len())); + + // Throw away the incoming bytes if they can't be written to + // the backend. This allows the next loop iteration to read + // from the socket agian, which gives it a convenient way of + // noticing that a client has disconnected even if nothing + // is currently being echoed to it. + if backend_hdl.can_write() { + remaining_to_send.extend(bytes.as_slice()); } } } - - // Receive bytes from connected WS clients to feed to the - // intermediate recv_ch - _ = ws_recv => { - probes::serial_ws_recv!(|| {}); + Event::WebsocketError(e) => { + probes::serial_task_ws_error!(|| ()); + warn!( + log, "serial console websocket error"; + "client_id" => client_id.0, + "error" => ?e + ); + break; + } + Event::WebsocketDisconnected => { + probes::serial_task_ws_disconnect!(|| ()); + info!( + log, "serial console client disconnected"; + "client_id" => client_id.0 + ); + break; } } } - info!(log, "Returning from serial task"); - Ok(()) -} - -/// Represents a serial connection into the VM. -pub struct Serial { - uart: Arc, - - task_control_ch: Mutex>>, - - sink_poller: Arc, - source_poller: Arc, - history: AsyncRwLock, -} - -impl Serial { - /// Creates a new buffered serial connection on top of `uart.` - /// - /// Creation of this object disables "autodiscard", and destruction - /// of the object re-enables "autodiscard" mode. - /// - /// # Arguments - /// - /// * `uart` - The device which data will be read from / written to. - /// * `sink_size` - A lower bound on the size of the writeback buffer. - /// * `source_size` - A lower bound on the size of the read buffer. - pub fn new( - uart: Arc, - sink_size: NonZeroUsize, - source_size: NonZeroUsize, - ) -> Serial { - let sink_poller = pollers::SinkBuffer::new(sink_size); - let source_poller = pollers::SourceBuffer::new(pollers::Params { - buf_size: source_size, - poll_interval: Duration::from_millis(10), - poll_miss_thresh: 5, - }); - let history = Default::default(); - sink_poller.attach(uart.as_ref()); - source_poller.attach(uart.as_ref()); - uart.set_autodiscard(false); - - let task_control_ch = Default::default(); - - Serial { uart, task_control_ch, sink_poller, source_poller, history } - } - - pub async fn read_source(&self, buf: &mut [u8]) -> Option { - let uart = self.uart.clone(); - let bytes_read = self.source_poller.read(buf, uart.as_ref()).await?; - self.history.write().await.consume(&buf[..bytes_read]); - Some(bytes_read) - } - - pub async fn write_sink(&self, buf: &[u8]) -> Option { - let uart = self.uart.clone(); - self.sink_poller.write(buf, uart.as_ref()).await - } - - pub(crate) async fn history_vec( - &self, - byte_offset: SerialHistoryOffset, - max_bytes: Option, - ) -> Result<(Vec, usize), history_buffer::Error> { - self.history.read().await.contents_vec(byte_offset, max_bytes) - } - - // provide the channel through which we inform connected websocket clients - // that a migration has occurred, and where to reconnect. - // (the server's serial-to-websocket task -- and thus the receiving end of - // this channel -- are spawned in `instance_ensure_common`, after the - // construction of `Serial`) - pub(crate) async fn set_task_control_sender( - &self, - control_ch: mpsc::Sender, - ) { - self.task_control_ch.lock().await.replace(control_ch); - } - - pub(crate) async fn export_history( - &self, - destination: SocketAddr, - ) -> Result { - let read_hist = self.history.read().await; - let from_start = read_hist.bytes_from_start() as u64; - let encoded = ron::to_string(&*read_hist) - .map_err(|e| MigrateError::Codec(e.to_string()))?; - drop(read_hist); - if let Some(ch) = self.task_control_ch.lock().await.as_ref() { - ch.send(SerialTaskControlMessage::Migration { - destination, - from_start, - }) - .await - .map_err(|_| MigrateError::InvalidInstanceState)?; - } - Ok(encoded) - } - pub(crate) async fn import( - &self, - serialized_hist: &str, - ) -> Result<(), MigrateError> { - self.sink_poller.attach(self.uart.as_ref()); - self.source_poller.attach(self.uart.as_ref()); - self.uart.set_autodiscard(false); - let decoded = ron::from_str(serialized_hist) - .map_err(|e| MigrateError::Codec(e.to_string()))?; - let mut write_hist = self.history.write().await; - *write_hist = decoded; - Ok(()) + info!(log, "serial console task exiting"; "client_id" => client_id.0); + let mut ws = sink.reunite(stream).expect("sink and stream should match"); + if let Err(e) = ws + .close(Some(CloseFrame { + code: CloseCode::Away, + reason: close_reason + .unwrap_or("serial connection task exited") + .into(), + })) + .await + { + warn!( + log, "error sending close frame to client"; + "client_id" => client_id.0, + "error" => ?e, + ); } -} -impl Drop for Serial { - fn drop(&mut self) { - self.uart.set_autodiscard(true); - } + client_tasks.lock().unwrap().ensure_removed_by_id(client_id); } diff --git a/bin/propolis-server/src/lib/server.rs b/bin/propolis-server/src/lib/server.rs index ea1c41849..2aeb8c2d2 100644 --- a/bin/propolis-server/src/lib/server.rs +++ b/bin/propolis-server/src/lib/server.rs @@ -405,7 +405,6 @@ async fn instance_serial_history_get( let max_bytes = query_params.max_bytes.map(|x| x as usize); let (data, end) = serial .history_vec(byte_offset, max_bytes) - .await .map_err(|e| HttpError::for_bad_request(None, e.to_string()))?; Ok(HttpResponseOk(api::InstanceSerialConsoleHistoryResponse { @@ -426,6 +425,7 @@ async fn instance_serial( let ctx = rqctx.context(); let vm = ctx.vm.active_vm().await.ok_or_else(not_created_error)?; let serial = vm.objects().lock_shared().await.com1().clone(); + let query = query.into_inner(); // Use the default buffering paramters for the websocket configuration // @@ -441,10 +441,10 @@ async fn instance_serial( ) .await; - let byte_offset = SerialHistoryOffset::try_from(&query.into_inner()).ok(); + let byte_offset = SerialHistoryOffset::try_from(&query).ok(); if let Some(mut byte_offset) = byte_offset { loop { - let (data, offset) = serial.history_vec(byte_offset, None).await?; + let (data, offset) = serial.history_vec(byte_offset, None)?; if data.is_empty() { break; } @@ -456,14 +456,21 @@ async fn instance_serial( } // Get serial task's handle and send it the websocket stream - let serial_task = vm.services().serial_task.lock().await; - serial_task + let serial_mgr = vm.services().serial_mgr.lock().await; + serial_mgr .as_ref() - .ok_or("Instance has no serial task")? - .websocks_ch - .send(ws_stream) - .await - .map_err(|e| format!("Serial socket hand-off failed: {}", e).into()) + .ok_or("Instance has no serial console manager")? + .connect( + ws_stream, + if query.writable { + crate::serial::ClientKind::ReadWrite + } else { + crate::serial::ClientKind::ReadOnly + }, + ) + .await; + + Ok(()) } #[channel { diff --git a/bin/propolis-server/src/lib/vm/active.rs b/bin/propolis-server/src/lib/vm/active.rs index 8a812bf27..3114a47d6 100644 --- a/bin/propolis-server/src/lib/vm/active.rs +++ b/bin/propolis-server/src/lib/vm/active.rs @@ -40,7 +40,7 @@ pub(crate) struct ActiveVm { /// Services that interact with VM users or the control plane outside the /// Propolis API (e.g. the serial console, VNC, and metrics reporting). - pub(super) services: VmServices, + pub(super) services: Arc, /// The runtime on which this VM's state driver and any tasks spawned by /// the VM's components will run. diff --git a/bin/propolis-server/src/lib/vm/ensure.rs b/bin/propolis-server/src/lib/vm/ensure.rs index 1097344fa..c9d805aa0 100644 --- a/bin/propolis-server/src/lib/vm/ensure.rs +++ b/bin/propolis-server/src/lib/vm/ensure.rs @@ -275,13 +275,15 @@ impl<'a> VmEnsureObjectsCreated<'a> { /// installs an active VM into the parent VM state machine and notifies the /// ensure requester that its request is complete. pub(crate) async fn ensure_active(self) -> VmEnsureActive<'a> { - let vm_services = VmServices::new( - self.log, - &self.vm_objects, - &self.ensure_request.properties, - self.ensure_options, - ) - .await; + let vm_services = Arc::new( + VmServices::new( + self.log, + &self.vm_objects, + &self.ensure_request.properties, + self.ensure_options, + ) + .await, + ); let vmm_rt_hdl = self.vmm_rt.handle().clone(); self.vm @@ -289,7 +291,7 @@ impl<'a> VmEnsureObjectsCreated<'a> { self.log, self.input_queue.clone(), &self.vm_objects, - vm_services, + &vm_services, self.vmm_rt, ) .await; @@ -312,6 +314,7 @@ impl<'a> VmEnsureObjectsCreated<'a> { vmm_rt_hdl, state_publisher: self.state_publisher, vm_objects: self.vm_objects, + vm_services, input_queue: self.input_queue, kernel_vm_paused: self.kernel_vm_paused, } @@ -326,12 +329,14 @@ pub(crate) struct VmEnsureActive<'a> { vmm_rt_hdl: tokio::runtime::Handle, state_publisher: &'a mut StatePublisher, vm_objects: Arc, + vm_services: Arc, input_queue: Arc, kernel_vm_paused: bool, } pub(super) struct VmEnsureActiveOutput { pub vm_objects: Arc, + pub vm_services: Arc, pub input_queue: Arc, pub vmm_rt_hdl: tokio::runtime::Handle, } @@ -368,6 +373,7 @@ impl VmEnsureActive<'_> { pub(super) fn into_inner(self) -> VmEnsureActiveOutput { VmEnsureActiveOutput { vm_objects: self.vm_objects, + vm_services: self.vm_services, input_queue: self.input_queue, vmm_rt_hdl: self.vmm_rt_hdl, } @@ -425,7 +431,7 @@ async fn initialize_vm_objects( init.initialize_rtc(&chipset)?; init.initialize_hpet(); - let com1 = Arc::new(init.initialize_uart(&chipset)); + let com1 = init.initialize_uart(&chipset); let ps2ctrl = init.initialize_ps2(&chipset); init.initialize_qemu_debug_port()?; init.initialize_qemu_pvpanic(VirtualMachine::new( diff --git a/bin/propolis-server/src/lib/vm/mod.rs b/bin/propolis-server/src/lib/vm/mod.rs index debb9af78..2b1bda452 100644 --- a/bin/propolis-server/src/lib/vm/mod.rs +++ b/bin/propolis-server/src/lib/vm/mod.rs @@ -103,7 +103,7 @@ pub(crate) mod ensure; pub(crate) mod guest_event; pub(crate) mod objects; mod request_queue; -mod services; +pub(crate) mod services; mod state_driver; pub(crate) mod state_publisher; @@ -409,7 +409,7 @@ impl Vm { log: &slog::Logger, state_driver_queue: Arc, objects: &Arc, - services: services::VmServices, + services: &Arc, vmm_rt: tokio::runtime::Runtime, ) { info!(self.log, "installing active VM"); @@ -423,7 +423,7 @@ impl Vm { external_state_rx: vm.external_state_rx, properties: vm.properties, objects: objects.clone(), - services, + services: services.clone(), tokio_rt: vmm_rt, }); } diff --git a/bin/propolis-server/src/lib/vm/objects.rs b/bin/propolis-server/src/lib/vm/objects.rs index 2ade7dab1..22d203f54 100644 --- a/bin/propolis-server/src/lib/vm/objects.rs +++ b/bin/propolis-server/src/lib/vm/objects.rs @@ -13,7 +13,7 @@ use std::{ use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use propolis::{ - hw::{ps2::ctrl::PS2Ctrl, qemu::ramfb::RamFb, uart::LpcUart}, + hw::{ps2::ctrl::PS2Ctrl, qemu::ramfb::RamFb}, vmm::VmmHdl, Machine, }; @@ -21,7 +21,9 @@ use propolis_api_types::instance_spec::SpecKey; use slog::{error, info}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use crate::{serial::Serial, spec::Spec, vcpu_tasks::VcpuTaskController}; +use crate::{ + serial::backend::ConsoleBackend, spec::Spec, vcpu_tasks::VcpuTaskController, +}; use super::{ state_driver::VmStartReason, BlockBackendMap, CrucibleBackendMap, DeviceMap, @@ -50,7 +52,7 @@ pub(super) struct InputVmObjects { pub devices: DeviceMap, pub block_backends: BlockBackendMap, pub crucible_backends: CrucibleBackendMap, - pub com1: Arc>, + pub com1: Arc, pub framebuffer: Option>, pub ps2ctrl: Arc, } @@ -81,7 +83,7 @@ pub(crate) struct VmObjectsLocked { crucible_backends: CrucibleBackendMap, /// A handle to the serial console connection to the VM's first COM port. - com1: Arc>, + com1: Arc, /// A handle to the VM's framebuffer. framebuffer: Option>, @@ -175,7 +177,7 @@ impl VmObjectsLocked { /// Yields a clonable reference to the serial console for this VM's first /// COM port. - pub(crate) fn com1(&self) -> &Arc> { + pub(crate) fn com1(&self) -> &Arc { &self.com1 } diff --git a/bin/propolis-server/src/lib/vm/services.rs b/bin/propolis-server/src/lib/vm/services.rs index da73fa702..920248e1d 100644 --- a/bin/propolis-server/src/lib/vm/services.rs +++ b/bin/propolis-server/src/lib/vm/services.rs @@ -12,14 +12,14 @@ use propolis_api_types::InstanceProperties; use slog::{error, info, Logger}; use crate::{ - serial::SerialTaskControlMessage, + serial::SerialConsoleManager, server::MetricsEndpointConfig, spec::Spec, stats::{ServerStats, VirtualMachine}, vnc::VncServer, }; -use super::objects::{VmObjects, VmObjectsShared}; +use super::objects::VmObjects; /// Information used to serve Oximeter metrics. #[derive(Default)] @@ -35,8 +35,8 @@ pub(crate) struct OximeterState { /// A collection of services visible to consumers outside this Propolis that /// depend on the functionality supplied by an extant VM. pub(crate) struct VmServices { - /// A VM's serial console handler task. - pub serial_task: tokio::sync::Mutex>, + /// A VM's serial console manager. + pub serial_mgr: tokio::sync::Mutex>, /// A VM's Oximeter state. /// @@ -79,10 +79,11 @@ impl VmServices { vnc_server.attach(vm_objects.ps2ctrl().clone(), ramfb.clone()); } - let serial_task = start_serial_task(log, &vm_objects).await; + let serial_mgr = + SerialConsoleManager::new(log.clone(), vm_objects.com1().clone()); Self { - serial_task: tokio::sync::Mutex::new(Some(serial_task)), + serial_mgr: tokio::sync::Mutex::new(Some(serial_mgr)), oximeter: tokio::sync::Mutex::new(oximeter_state), vnc_server, } @@ -92,12 +93,8 @@ impl VmServices { pub(super) async fn stop(&self, log: &Logger) { self.vnc_server.stop().await; - if let Some(serial_task) = self.serial_task.lock().await.take() { - let _ = serial_task - .control_ch - .send(SerialTaskControlMessage::Stopping) - .await; - let _ = serial_task.task.await; + if let Some(serial_mgr) = self.serial_mgr.lock().await.take() { + serial_mgr.stop().await; } let mut oximeter_state = self.oximeter.lock().await; @@ -165,30 +162,3 @@ async fn register_oximeter_producer( oximeter_state } - -/// Launches a serial console handler task. -async fn start_serial_task( - log: &slog::Logger, - vm_objects: &VmObjectsShared<'_>, -) -> crate::serial::SerialTask { - let (websocks_ch, websocks_recv) = tokio::sync::mpsc::channel(1); - let (control_ch, control_recv) = tokio::sync::mpsc::channel(1); - - let serial = vm_objects.com1().clone(); - serial.set_task_control_sender(control_ch.clone()).await; - let err_log = log.new(slog::o!("component" => "serial task")); - let task = tokio::spawn(async move { - if let Err(e) = crate::serial::instance_serial_task( - websocks_recv, - control_recv, - serial, - err_log.clone(), - ) - .await - { - error!(err_log, "Failure in serial task: {}", e); - } - }); - - crate::serial::SerialTask { task, control_ch, websocks_ch } -} diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index 0987699e9..9f746c24e 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -35,6 +35,7 @@ use super::{ guest_event::{self, GuestEvent}, objects::VmObjects, request_queue::{self, ExternalRequest, InstanceAutoStart}, + services::VmServices, state_publisher::{MigrationStateUpdate, StatePublisher}, InstanceEnsureResponseTx, }; @@ -234,6 +235,9 @@ struct StateDriver { /// The VM objects this driver is managing. objects: Arc, + /// The services associated with this driver's VM. + services: Arc, + /// The input queue this driver gets events from. input_queue: Arc, @@ -289,12 +293,17 @@ pub(super) async fn run_state_driver( } }; - let VmEnsureActiveOutput { vm_objects, input_queue, vmm_rt_hdl } = - activated_vm.into_inner(); + let VmEnsureActiveOutput { + vm_objects, + vm_services, + input_queue, + vmm_rt_hdl, + } = activated_vm.into_inner(); let state_driver = StateDriver { log, objects: vm_objects, + services: vm_services, input_queue, external_state: state_publisher, paused: false, @@ -631,6 +640,7 @@ impl StateDriver { match migration .run( &self.objects, + &self.services, &mut self.external_state, &mut self.migration_src_state, ) diff --git a/crates/propolis-api-types/src/lib.rs b/crates/propolis-api-types/src/lib.rs index 4296be0ac..8bad49ddc 100644 --- a/crates/propolis-api-types/src/lib.rs +++ b/crates/propolis-api-types/src/lib.rs @@ -287,28 +287,30 @@ pub struct Instance { /// Request a specific range of an Instance's serial console output history. #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, PartialEq)] pub struct InstanceSerialConsoleHistoryRequest { - /// Character index in the serial buffer from which to read, counting the bytes output since - /// instance start. If this is not provided, `most_recent` must be provided, and if this *is* - /// provided, `most_recent` must *not* be provided. + /// Character index in the serial buffer from which to read, counting the + /// bytes output since instance start. If this is not provided, + /// `most_recent` must be provided, and if this *is* provided, `most_recent` + /// must *not* be provided. pub from_start: Option, - /// Character index in the serial buffer from which to read, counting *backward* from the most - /// recently buffered data retrieved from the instance. (See note on `from_start` about mutual - /// exclusivity) + /// Character index in the serial buffer from which to read, counting + /// *backward* from the most recently buffered data retrieved from the + /// instance. (See note on `from_start` about mutual exclusivity) pub most_recent: Option, - /// Maximum number of bytes of buffered serial console contents to return. If the requested - /// range runs to the end of the available buffer, the data returned will be shorter than - /// `max_bytes`. + /// Maximum number of bytes of buffered serial console contents to return. + /// If the requested range runs to the end of the available buffer, the data + /// returned will be shorter than `max_bytes`. pub max_bytes: Option, } /// Contents of an Instance's serial console buffer. #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct InstanceSerialConsoleHistoryResponse { - /// The bytes starting from the requested offset up to either the end of the buffer or the - /// request's `max_bytes`. Provided as a u8 array rather than a string, as it may not be UTF-8. + /// The bytes starting from the requested offset up to either the end of the + /// buffer or the request's `max_bytes`. Provided as a u8 array rather than + /// a string, as it may not be UTF-8. pub data: Vec, - /// The absolute offset since boot (suitable for use as `byte_offset` in a subsequent request) - /// of the last byte returned in `data`. + /// The absolute offset since boot (suitable for use as `byte_offset` in a + /// subsequent request) of the last byte returned in `data`. pub last_byte_offset: u64, } @@ -316,15 +318,22 @@ pub struct InstanceSerialConsoleHistoryResponse { /// bytes from the buffered history first. #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, PartialEq)] pub struct InstanceSerialConsoleStreamRequest { - /// Character index in the serial buffer from which to read, counting the bytes output since - /// instance start. If this is provided, `most_recent` must *not* be provided. - // TODO: if neither is specified, send enough serial buffer history to reconstruct - // the current contents and cursor state of an interactive terminal + /// Character index in the serial buffer from which to read, counting the + /// bytes output since instance start. If this is provided, `most_recent` + /// must *not* be provided. + // TODO: if neither is specified, send enough serial buffer history to + // reconstruct the current contents and cursor state of an interactive + // terminal pub from_start: Option, - /// Character index in the serial buffer from which to read, counting *backward* from the most - /// recently buffered data retrieved from the instance. (See note on `from_start` about mutual - /// exclusivity) + /// Character index in the serial buffer from which to read, counting + /// *backward* from the most recently buffered data retrieved from the + /// instance. (See note on `from_start` about mutual exclusivity) pub most_recent: Option, + /// True if the connection should allow writing. If this option is set, any + /// existing writer to the serial console will be disconnected when this + /// client connects. + #[serde(default)] + pub writable: bool, } /// Control message(s) sent through the websocket to serial console clients. diff --git a/lib/propolis-client/src/support.rs b/lib/propolis-client/src/support.rs index 9ab7c2d87..efb1956ff 100644 --- a/lib/propolis-client/src/support.rs +++ b/lib/propolis-client/src/support.rs @@ -74,6 +74,7 @@ pub(crate) trait SerialConsoleStreamBuilder: Send { &mut self, address: SocketAddr, offset: WSClientOffset, + readonly: bool, ) -> Result, WSError>; } @@ -95,6 +96,7 @@ impl SerialConsoleStreamBuilder for PropolisSerialBuilder { &mut self, address: SocketAddr, offset: WSClientOffset, + readonly: bool, ) -> Result, WSError> { let client = PropolisClient::new(&format!("http://{}", address)); let mut req = client.instance_serial(); @@ -108,6 +110,7 @@ impl SerialConsoleStreamBuilder for PropolisSerialBuilder { } } + req = req.writable(!readonly); let upgraded = req .send() .await @@ -157,6 +160,7 @@ impl SerialConsoleStreamBuilder // offset is currently unused by this builder. Worth testing in // the future. _offset: WSClientOffset, + _readonly: bool, ) -> Result, WSError> { if let Some((delay, stream)) = self.client_conns_and_delays.remove(&address) @@ -191,6 +195,7 @@ pub enum WSClientOffset { pub struct InstanceSerialConsoleHelper { stream_builder: Box, ws_stream: WebSocketStream>, + readonly: bool, log: Option, } @@ -202,10 +207,12 @@ impl InstanceSerialConsoleHelper { pub async fn new( address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { let stream_builder = PropolisSerialBuilder::new(); - Self::new_with_builder(stream_builder, address, offset, log).await + Self::new_with_builder(stream_builder, address, offset, readonly, log) + .await } /// Creates a new serial console helper for testing. @@ -217,6 +224,7 @@ impl InstanceSerialConsoleHelper { connections: impl IntoIterator, address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { let stream_builder = TestSerialBuilder::new( @@ -224,7 +232,8 @@ impl InstanceSerialConsoleHelper { .into_iter() .map(|(addr, stream)| (addr, Duration::ZERO, stream)), ); - Self::new_with_builder(stream_builder, address, offset, log).await + Self::new_with_builder(stream_builder, address, offset, readonly, log) + .await } /// Creates a new serial console helper for testing, with delays before @@ -238,10 +247,12 @@ impl InstanceSerialConsoleHelper { connections: impl IntoIterator, address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { let stream_builder = TestSerialBuilder::new(connections); - Self::new_with_builder(stream_builder, address, offset, log).await + Self::new_with_builder(stream_builder, address, offset, readonly, log) + .await } // Currently used for testing, and not exposed to clients. @@ -249,12 +260,18 @@ impl InstanceSerialConsoleHelper { mut stream_builder: impl SerialConsoleStreamBuilder + 'static, address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { - let stream = stream_builder.build(address, offset).await?; + let stream = stream_builder.build(address, offset, readonly).await?; let ws_stream = WebSocketStream::from_raw_socket(stream, Role::Client, None).await; - Ok(Self { stream_builder: Box::new(stream_builder), ws_stream, log }) + Ok(Self { + stream_builder: Box::new(stream_builder), + ws_stream, + readonly, + log, + }) } /// Receives the next [WSMessage] from the server, holding it in @@ -401,6 +418,7 @@ impl InstanceSerialConsoleMessage<'_> { .build( destination, WSClientOffset::FromStart(from_start), + self.helper.readonly, ) .await?; self.helper.ws_stream = WebSocketStream::from_raw_socket( @@ -463,6 +481,7 @@ mod test { [(address, client_conn)], address, WSClientOffset::FromStart(0), + false, None, ) .await @@ -514,6 +533,7 @@ mod test { ], address_1, WSClientOffset::FromStart(0), + false, None, ) .await diff --git a/openapi/propolis-server.json b/openapi/propolis-server.json index 4e23f600c..c933e20d5 100644 --- a/openapi/propolis-server.json +++ b/openapi/propolis-server.json @@ -265,6 +265,14 @@ "format": "uint64", "minimum": 0 } + }, + { + "in": "query", + "name": "writable", + "description": "True if the connection should allow writing. If this option is set, any existing writer to the serial console will be disconnected when this client connects.", + "schema": { + "type": "boolean" + } } ], "responses": { diff --git a/phd-tests/framework/src/test_vm/mod.rs b/phd-tests/framework/src/test_vm/mod.rs index caa17360e..54ef2d1ce 100644 --- a/phd-tests/framework/src/test_vm/mod.rs +++ b/phd-tests/framework/src/test_vm/mod.rs @@ -349,6 +349,7 @@ impl TestVm { .server_addr(), ), WSClientOffset::MostRecent(0), + false, None, ) .await?;