Skip to content

Commit 57edbfc

Browse files
committed
server: don't allow clients to block VM stop; docs & cleanup
1 parent 912a002 commit 57edbfc

File tree

2 files changed

+149
-59
lines changed

2 files changed

+149
-59
lines changed

bin/propolis-server/src/lib/serial/backend.rs

+39-7
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ impl Permissions {
4444
/// Determines what happens when the backend is unable to send a guest byte back
4545
/// to a client because the client's channel is full.
4646
#[derive(Clone, Copy)]
47-
pub(super) enum ReadWaitDiscipline {
47+
pub(super) enum FullReadChannelDiscipline {
4848
/// The backend should block until it can send to this client.
4949
Block,
5050

@@ -60,7 +60,7 @@ struct Client {
6060

6161
/// Determines what happens when the backend wants to send a byte and
6262
/// [`Self::tx`] is full.
63-
read_discipline: ReadWaitDiscipline,
63+
read_discipline: FullReadChannelDiscipline,
6464
}
6565

6666
/// A handle held by a client that represents its connection to the backend.
@@ -185,8 +185,20 @@ impl Inner {
185185
/// access a single serial device.
186186
pub struct ConsoleBackend {
187187
inner: Arc<Mutex<Inner>>,
188+
189+
/// The character [`Sink`] that should receive writes to this backend.
190+
/// Writes should not access the sink directly; instead, they should be
191+
/// directed to [`Self::sink_buffer`]. This reference is needed because
192+
/// [`SinkBuffer`]'s current API requires a sink to be passed into each
193+
/// attempt to write (buffers don't own references to their sinks).
188194
sink: Arc<dyn Sink>,
195+
196+
/// The buffer that sits in front of this backend's sink. Writes to the
197+
/// backend should be directed at the buffer, not at [`Self::sink`].
189198
sink_buffer: Arc<SinkBuffer>,
199+
200+
/// A channel used to tell the backend's reader task that the backend has
201+
/// been closed.
190202
done_tx: oneshot::Sender<()>,
191203
}
192204

@@ -227,20 +239,26 @@ impl ConsoleBackend {
227239
///
228240
/// - `read_tx`: A channel to which the backend should send bytes read from
229241
/// its device.
242+
/// - `permissions`: The read/write permissions this client should have.
243+
/// - `full_read_tx_discipline`: Describes what should happen if the reader
244+
/// task ever finds that `read_tx` is full when dispatching a byte to it.
230245
pub(super) fn attach_client(
231246
self: &Arc<Self>,
232247
read_tx: mpsc::Sender<u8>,
233248
permissions: Permissions,
234-
wait_discipline: ReadWaitDiscipline,
249+
full_read_tx_discipline: FullReadChannelDiscipline,
235250
) -> ClientHandle {
236251
let mut inner = self.inner.lock().unwrap();
237252
let id = inner.next_client_id();
238-
let client = Client { tx: read_tx, read_discipline: wait_discipline };
253+
let client =
254+
Client { tx: read_tx, read_discipline: full_read_tx_discipline };
239255

240256
inner.clients.insert(id, client);
241257
ClientHandle { id, backend: self.clone(), permissions }
242258
}
243259

260+
/// Returns the contents of this backend's history buffer. See
261+
/// [`HistoryBuffer::contents_vec`].
244262
pub fn history_vec(
245263
&self,
246264
byte_offset: SerialHistoryOffset,
@@ -250,6 +268,8 @@ impl ConsoleBackend {
250268
inner.buffer.contents_vec(byte_offset, max_bytes)
251269
}
252270

271+
/// Returns the number of bytes that have ever been sent to this backend's
272+
/// history buffer.
253273
pub fn bytes_since_start(&self) -> usize {
254274
self.inner.lock().unwrap().buffer.bytes_from_start()
255275
}
@@ -293,6 +313,9 @@ mod migrate {
293313
}
294314
}
295315

316+
/// Reads bytes from the supplied `source` and dispatches them to the clients in
317+
/// `inner`. Each backend is expected to spin up one task that runs this
318+
/// function.
296319
async fn read_task(
297320
inner: Arc<Mutex<Inner>>,
298321
source: Arc<dyn Source>,
@@ -322,10 +345,15 @@ async fn read_task(
322345

323346
let to_send = &bytes[0..bytes_read];
324347

348+
// Capture a list of all the clients who should receive this byte with
349+
// the lock held, then drop the lock before sending to any of them. Note
350+
// that sends to clients may block for an arbitrary length of time: the
351+
// receiver may be relaying received bytes to a websocket, and the
352+
// remote peer may be slow to accept them.
325353
struct CapturedClient {
326354
id: ClientId,
327355
tx: mpsc::Sender<u8>,
328-
discipline: ReadWaitDiscipline,
356+
discipline: FullReadChannelDiscipline,
329357
disconnect: bool,
330358
}
331359

@@ -343,19 +371,23 @@ async fn read_task(
343371
.collect::<Vec<CapturedClient>>()
344372
};
345373

374+
// Prepare to delete any clients that are no longer active (i.e., who
375+
// have dropped the receiver sides of their channels) or who are using
376+
// the close-on-full-channel discipline and who have a full channel.
346377
for byte in to_send {
347378
for client in clients.iter_mut() {
348379
client.disconnect = match client.discipline {
349-
ReadWaitDiscipline::Block => {
380+
FullReadChannelDiscipline::Block => {
350381
client.tx.send(*byte).await.is_err()
351382
}
352-
ReadWaitDiscipline::Close => {
383+
FullReadChannelDiscipline::Close => {
353384
client.tx.try_send(*byte).is_err()
354385
}
355386
}
356387
}
357388
}
358389

390+
// Clean up any clients who met the disconnection criteria.
359391
let mut guard = inner.lock().unwrap();
360392
guard.buffer.consume(to_send);
361393
for client in clients.iter().filter(|c| c.disconnect) {

0 commit comments

Comments
 (0)