Skip to content

Commit cf9d56d

Browse files
committed
[sled-agent] Don't block InstanceManager on full MPSCs
Sled-agent's `InstanceManager` task is responsible for managing the table of all instances presently running on the sled. When the sled-agent receives a request relating to an individual instance on the sled, it's sent to the `InstanceManager` over a `tokio::sync::mpsc` channel, and is then dispatched by the `InstanceManager` to the `InstanceRunner` task responsible for that individual instance by sending it over a *second* `tokio::sync::mpsc` channel. This is where things start to get interesting.[^1] `tokio::sync::mpsc` is a *bounded* channel: there is a maximum number of messages which may be queued by a given MPSC channel at any given time. The `mpsc::Sender::send` method is an `async fn`, and if the channel is at capacity, that method will _wait_ until there is once again space in the channel to send the message being sent. Presently, `mpsc::Sender::send` is called by the `InstanceManager`'s main run loop when dispatching a request to an individual instance. As you may have already started to piece together, this means that if a given `InstanceRunner` task is not able to process requests fast enough to drain its channel, the entire `InstanceManager` loop will wait when dispatching a request to that instance until the queue has been drained. This means that if one instance's runner task has gotten stuck on something, like waiting for a Crucible flush that will never complete (as seen in #6911), that instance will prevent requests being dispatched to *any other instance* managed by the sled-agent. This is quite unfortunate! This commit fixes this behavior by changing the functions that send requests to an individual instance's task to instead *shed load* when that instance's request queue is full. We now use the `mpsc::Sender::try_send` method, rather than `mpsc::Sender::send`, which does not wait and instead immediately returns an error when the channel is full. This allows the `InstanceManager` to instead return an error to the client indicating the channel is full, and move on to processing requests to other instances which may not be stuck. Thus, a single stuck instance can no longer block requests from being dispatched to other, perfectly fine instances. The error returned when the channel is at capacity is converted to an HTTP 503 Service Unavailable error by the API. This indicates to the client that their request to that instance was not able to be processed at this time, but that it may be processed successfully in the future.[^2] Now, we can shed load while allowing clients to retry later, which seems much better than the present situation. [^1]: In the sense of "may you live in interesting times", naturally. [^2]: I also considered returning 429 Too Many Requests here, but my understanding is that that status code is supposed to indicate that too many requests have been received from *that specific client*. In this case, we haven't hit a per-client rate limit; we're just overloaded by requests more broadly, so it's not that particular client's fault.
1 parent ca63e9f commit cf9d56d

File tree

3 files changed

+181
-89
lines changed

3 files changed

+181
-89
lines changed

sled-agent/src/instance.rs

+134-57
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,12 @@ pub enum Error {
119119
#[error("Failed to send request to Instance: Channel closed")]
120120
FailedSendChannelClosed,
121121

122+
#[error(
123+
"Failed to send request to Instance: channel at capacity \
124+
({QUEUE_SIZE})"
125+
)]
126+
FailedSendChannelFull,
127+
122128
#[error(
123129
"Failed to send request from Instance Runner: Client Channel closed"
124130
)]
@@ -217,10 +223,10 @@ enum InstanceRequest {
217223
tx: oneshot::Sender<Result<ZoneBundleMetadata, BundleError>>,
218224
},
219225
GetFilesystemPool {
220-
tx: oneshot::Sender<Option<ZpoolName>>,
226+
tx: oneshot::Sender<Result<Option<ZpoolName>, ManagerError>>,
221227
},
222228
CurrentState {
223-
tx: oneshot::Sender<SledVmmState>,
229+
tx: oneshot::Sender<Result<SledVmmState, ManagerError>>,
224230
},
225231
PutState {
226232
state: VmmStateRequested,
@@ -248,6 +254,58 @@ enum InstanceRequest {
248254
},
249255
}
250256

257+
impl InstanceRequest {
258+
/// Handle an error returned by [`mpsc::Sender::try_send`] when attempting
259+
/// to send a request to the instance.
260+
///
261+
/// This is a bit complex: the returned [`mpsc::error::TrySendError`] will
262+
/// contain the [`InstanceRequest`] we were trying to send, and thus the
263+
/// [`oneshot::Sender`] for that request's response. This function handles
264+
/// the `TrySendError` by inspecting the error to determine whether the
265+
/// channel has closed or is full, constructing the relevant [`Error`], and
266+
/// extracting the response oneshot channel from the request, and then
267+
/// sending back the error over that channel.
268+
///
269+
/// If sending the error back to the client fails, this function returns an
270+
/// error, so that the client having given up can be logged; otherwise, it returns `Ok(())`.
271+
fn fail_try_send(
272+
err: mpsc::error::TrySendError<Self>,
273+
) -> Result<(), Error> {
274+
let (error, this) = match err {
275+
mpsc::error::TrySendError::Closed(this) => {
276+
(Error::FailedSendChannelClosed, this)
277+
}
278+
mpsc::error::TrySendError::Full(this) => {
279+
(Error::FailedSendChannelFull, this)
280+
}
281+
};
282+
283+
match this {
284+
Self::RequestZoneBundle { tx } => tx
285+
.send(Err(BundleError::FailedSend(anyhow!(error))))
286+
.map_err(|_| Error::FailedSendClientClosed),
287+
Self::GetFilesystemPool { tx } => tx
288+
.send(Err(error.into()))
289+
.map_err(|_| Error::FailedSendClientClosed),
290+
Self::CurrentState { tx } => tx
291+
.send(Err(error.into()))
292+
.map_err(|_| Error::FailedSendClientClosed),
293+
Self::PutState { tx, .. } => tx
294+
.send(Err(error.into()))
295+
.map_err(|_| Error::FailedSendClientClosed),
296+
Self::Terminate { tx, .. } => tx
297+
.send(Err(error.into()))
298+
.map_err(|_| Error::FailedSendClientClosed),
299+
Self::IssueSnapshotRequest { tx, .. }
300+
| Self::AddExternalIp { tx, .. }
301+
| Self::DeleteExternalIp { tx, .. }
302+
| Self::RefreshExternalIps { tx } => tx
303+
.send(Err(error.into()))
304+
.map_err(|_| Error::FailedSendClientClosed),
305+
}
306+
}
307+
}
308+
251309
// A small task which tracks the state of the instance, by constantly querying
252310
// the state of Propolis for updates.
253311
//
@@ -488,11 +546,11 @@ impl InstanceRunner {
488546
.map_err(|_| Error::FailedSendClientClosed)
489547
},
490548
Some(GetFilesystemPool { tx } ) => {
491-
tx.send(self.get_filesystem_zpool())
549+
tx.send(Ok(self.get_filesystem_zpool()))
492550
.map_err(|_| Error::FailedSendClientClosed)
493551
},
494552
Some(CurrentState{ tx }) => {
495-
tx.send(self.current_state())
553+
tx.send(Ok(self.current_state()))
496554
.map_err(|_| Error::FailedSendClientClosed)
497555
},
498556
Some(PutState{ state, tx }) => {
@@ -562,9 +620,9 @@ impl InstanceRunner {
562620
RequestZoneBundle { tx } => tx
563621
.send(Err(BundleError::InstanceTerminating))
564622
.map_err(|_| ()),
565-
GetFilesystemPool { tx } => tx.send(None).map_err(|_| ()),
623+
GetFilesystemPool { tx } => tx.send(Ok(None)).map_err(|_| ()),
566624
CurrentState { tx } => {
567-
tx.send(self.current_state()).map_err(|_| ())
625+
tx.send(Ok(self.current_state())).map_err(|_| ())
568626
}
569627
PutState { tx, .. } => {
570628
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
@@ -1092,13 +1150,48 @@ fn propolis_error_code(
10921150
}
10931151

10941152
/// Describes a single Propolis server that incarnates a specific instance.
1153+
#[derive(Clone)]
10951154
pub struct Instance {
10961155
id: InstanceUuid,
10971156

1157+
/// Request channel for communicating with the instance task.
1158+
///
1159+
/// # Extremely Serious Warning
1160+
///
1161+
/// This channel is used by the `InstanceManager` task to communicate to the
1162+
/// instance task corresponding to each instance on this sled. Note that all
1163+
/// of the methods on this type which send [`InstanceRequest`]s over this
1164+
/// channel use [`mpsc::Sender::try_send`], which fails if the channel is at
1165+
/// capacity, and *not* [`mpsc::Sender::send`], which is an async method
1166+
/// that *waits* until capacity is available. THIS IS VERY IMPORTANT.
1167+
///
1168+
/// This is because the `InstanceManager` task will call these methods in
1169+
/// its request-processing loop as it receives requests from clients, in
1170+
/// order to forward the request to the relevant instance. If the instance's
1171+
/// channel has filled up because the instance is currently processing a
1172+
/// slow request, `await`ing a call to [`mpsc::Sender::send`] will block the
1173+
/// `InstanceManager`'s main loop from proceeding until the instance task
1174+
/// has finished what it's doing and drained the next request from channel.
1175+
/// Critically, this means that requests to *other, unrelated instances* on
1176+
/// this sled would have to wait until this instance has finished what it's
1177+
/// doing. That means a single deadlocked instance task, which is waiting
1178+
/// for something that never completes, can render *all* instances on this
1179+
/// sled inaccessible.
1180+
///
1181+
/// Therefore, any time we send requests to the `Instance` over this channel
1182+
/// from code that's called in the `InstanceManager`'s run loop MUST use
1183+
/// [`mpsc::Sender::try_send`] rather than [`mpsc::Sender::send`]. Should
1184+
/// the channel be at capacity, we return an
1185+
/// [`Error::FailedSendChannelFull`], which eventually becomes a 503 Service
1186+
/// Unavailable error when returned to the client. It is acceptable to call
1187+
/// [`mpsc::Sender::send`] on this channel ONLY from code which runs
1188+
/// exclusively in tasks that are not blocking the `InstanceManager`'s run
1189+
/// loop.
10981190
tx: mpsc::Sender<InstanceRequest>,
10991191

1192+
/// This is reference
11001193
#[allow(dead_code)]
1101-
runner_handle: tokio::task::JoinHandle<()>,
1194+
runner_handle: Arc<tokio::task::JoinHandle<()>>,
11021195
}
11031196

11041197
#[derive(Debug)]
@@ -1250,43 +1343,39 @@ impl Instance {
12501343
let runner_handle =
12511344
tokio::task::spawn(async move { runner.run().await });
12521345

1253-
Ok(Instance { id, tx, runner_handle })
1346+
Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) })
12541347
}
12551348

12561349
pub fn id(&self) -> InstanceUuid {
12571350
self.id
12581351
}
12591352

12601353
/// Create bundle from an instance zone.
1261-
pub async fn request_zone_bundle(
1354+
pub fn request_zone_bundle(
12621355
&self,
12631356
tx: oneshot::Sender<Result<ZoneBundleMetadata, BundleError>>,
1264-
) -> Result<(), BundleError> {
1357+
) -> Result<(), Error> {
12651358
self.tx
1266-
.send(InstanceRequest::RequestZoneBundle { tx })
1267-
.await
1268-
.map_err(|err| BundleError::FailedSend(anyhow!(err)))?;
1269-
Ok(())
1359+
.try_send(InstanceRequest::RequestZoneBundle { tx })
1360+
.or_else(InstanceRequest::fail_try_send)
12701361
}
12711362

1272-
pub async fn get_filesystem_zpool(
1363+
pub fn get_filesystem_zpool(
12731364
&self,
1274-
) -> Result<Option<ZpoolName>, Error> {
1275-
let (tx, rx) = oneshot::channel();
1365+
tx: oneshot::Sender<Result<Option<ZpoolName>, ManagerError>>,
1366+
) -> Result<(), Error> {
12761367
self.tx
1277-
.send(InstanceRequest::GetFilesystemPool { tx })
1278-
.await
1279-
.map_err(|_| Error::FailedSendChannelClosed)?;
1280-
Ok(rx.await?)
1368+
.try_send(InstanceRequest::GetFilesystemPool { tx })
1369+
.or_else(InstanceRequest::fail_try_send)
12811370
}
12821371

1283-
pub async fn current_state(&self) -> Result<SledVmmState, Error> {
1284-
let (tx, rx) = oneshot::channel();
1372+
pub fn current_state(
1373+
&self,
1374+
tx: oneshot::Sender<Result<SledVmmState, ManagerError>>,
1375+
) -> Result<(), Error> {
12851376
self.tx
1286-
.send(InstanceRequest::CurrentState { tx })
1287-
.await
1288-
.map_err(|_| Error::FailedSendChannelClosed)?;
1289-
Ok(rx.await?)
1377+
.try_send(InstanceRequest::CurrentState { tx })
1378+
.or_else(InstanceRequest::fail_try_send)
12901379
}
12911380

12921381
/// Attempts to update the current state of the instance by launching a
@@ -1300,84 +1389,72 @@ impl Instance {
13001389
/// instance begins to stop when Propolis has just begun to handle a prior
13011390
/// request to reboot, the instance's state may proceed from Stopping to
13021391
/// Rebooting to Running to Stopping to Stopped.
1303-
pub async fn put_state(
1392+
pub fn put_state(
13041393
&self,
13051394
tx: oneshot::Sender<Result<VmmPutStateResponse, ManagerError>>,
13061395
state: VmmStateRequested,
13071396
) -> Result<(), Error> {
13081397
self.tx
1309-
.send(InstanceRequest::PutState { state, tx })
1310-
.await
1311-
.map_err(|_| Error::FailedSendChannelClosed)?;
1312-
Ok(())
1398+
.try_send(InstanceRequest::PutState { state, tx })
1399+
.or_else(InstanceRequest::fail_try_send)
13131400
}
13141401

13151402
/// Rudely terminates this instance's Propolis (if it has one) and
13161403
/// immediately transitions the instance to the Destroyed state.
1317-
pub async fn terminate(
1404+
pub fn terminate(
13181405
&self,
13191406
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
13201407
mark_failed: bool,
13211408
) -> Result<(), Error> {
13221409
self.tx
1323-
.send(InstanceRequest::Terminate { mark_failed, tx })
1324-
.await
1325-
.map_err(|_| Error::FailedSendChannelClosed)?;
1326-
Ok(())
1410+
.try_send(InstanceRequest::Terminate { mark_failed, tx })
1411+
.or_else(InstanceRequest::fail_try_send)
13271412
}
13281413

1329-
pub async fn issue_snapshot_request(
1414+
pub fn issue_snapshot_request(
13301415
&self,
13311416
tx: oneshot::Sender<Result<(), ManagerError>>,
13321417
disk_id: Uuid,
13331418
snapshot_id: Uuid,
13341419
) -> Result<(), Error> {
13351420
self.tx
1336-
.send(InstanceRequest::IssueSnapshotRequest {
1421+
.try_send(InstanceRequest::IssueSnapshotRequest {
13371422
disk_id,
13381423
snapshot_id,
13391424
tx,
13401425
})
1341-
.await
1342-
.map_err(|_| Error::FailedSendChannelClosed)?;
1343-
Ok(())
1426+
.or_else(InstanceRequest::fail_try_send)
13441427
}
13451428

1346-
pub async fn add_external_ip(
1429+
pub fn add_external_ip(
13471430
&self,
13481431
tx: oneshot::Sender<Result<(), ManagerError>>,
13491432
ip: &InstanceExternalIpBody,
13501433
) -> Result<(), Error> {
13511434
self.tx
1352-
.send(InstanceRequest::AddExternalIp { ip: *ip, tx })
1353-
.await
1354-
.map_err(|_| Error::FailedSendChannelClosed)?;
1355-
Ok(())
1435+
.try_send(InstanceRequest::AddExternalIp { ip: *ip, tx })
1436+
.or_else(InstanceRequest::fail_try_send)
13561437
}
13571438

1358-
pub async fn delete_external_ip(
1439+
pub fn delete_external_ip(
13591440
&self,
13601441
tx: oneshot::Sender<Result<(), ManagerError>>,
13611442
ip: &InstanceExternalIpBody,
13621443
) -> Result<(), Error> {
13631444
self.tx
1364-
.send(InstanceRequest::DeleteExternalIp { ip: *ip, tx })
1365-
.await
1366-
.map_err(|_| Error::FailedSendChannelClosed)?;
1367-
Ok(())
1445+
.try_send(InstanceRequest::DeleteExternalIp { ip: *ip, tx })
1446+
.or_else(InstanceRequest::fail_try_send)
13681447
}
13691448

13701449
/// Reinstalls an instance's set of external IPs within OPTE, using
13711450
/// up-to-date IP<->IGW mappings. This will not disrupt existing flows.
1372-
pub async fn refresh_external_ips(
1451+
pub fn refresh_external_ips(
13731452
&self,
13741453
tx: oneshot::Sender<Result<(), ManagerError>>,
13751454
) -> Result<(), Error> {
13761455
self.tx
1377-
.send(InstanceRequest::RefreshExternalIps { tx })
1378-
.await
1379-
.map_err(|_| Error::FailedSendChannelClosed)?;
1380-
Ok(())
1456+
.try_send(InstanceRequest::RefreshExternalIps { tx })
1457+
.or_else(InstanceRequest::fail_try_send)
13811458
}
13821459
}
13831460

0 commit comments

Comments
 (0)