Skip to content

Commit e313b65

Browse files
authored
[sled-agent] Give InstanceManager a gun (#6915)
Stacked on top of #6913 Presently, sled-agent sends requests to terminate an instance to the `InstanceRunner` task over the same `tokio::sync::mpsc` request channel as all other requests sent to that instance. This means that the `InstanceRunner` will attempt to terminate the instance only once other requests received before the termination request have been processed, and an instance cannot be terminated if its request channel has filled up. Similarly, if an instance's `InstanceRunner` task is waiting for an in-flight request to the VMM to complete, the request to terminate the instance will not be seen until the current request to Propolis has returned. This means that if the instance has gotten stuck for some reason --- e.g., because it is attempting a Crucible snapshot that cannot complete because a physical disk has gone missing, as seen in #6911 --- the instance cannot be terminated. Sadly, in this case, the only way to resolve the stuck request is to terminate the instance, but we cannot do so *because* the instance is stuck. This seems unfortunate: Ii we try to kill an instance because it's doing something that it will never be able to finish, it shouldn't be able to say "no, you can't kill me, I'm too *busy* to die!". Instead, requests to terminate the instance should be prioritized over other requests. This commit does that. Rather than sending termination requests to the `InstanceRunner` over the same channel as all other requests, we instead introduce a separate channel that's *just* for termination requests, which is preferred over the request channel in the biased `tokio::select!` in the `InstanceRunner` run loop. This means that a full request channel cannot stop a termination request from being sent. When a request to the VMM is in flight, the future that awaits that request's completion is now one branch of a similar `tokio::select!` with the termination channel. This way, if a termination request comes in while the `InstanceRunner` is awaiting an in-flight instance operation, it will still be notified immediately of the termination request, cancel whatever operation it's waiting for, and go ahead and terminate the VMM immediately. This is the correct behavior here, since the terminate operation is intended to forcefully terminate the VMM *now*, and is used internally for purposes such as `use_only_these_disks` killing instances that are using a no-longer-extant disk, or the control plane requesting that the sled-agent forcibly unregister the instance. "Normal" requests to stop the instance gracefully will go through the `instance_put_state` API instead, sending requests through the normal request channel and allowing in flight operations to complete.
1 parent bc41f54 commit e313b65

File tree

1 file changed

+199
-74
lines changed

1 file changed

+199
-74
lines changed

sled-agent/src/instance.rs

+199-74
Original file line numberDiff line numberDiff line change
@@ -232,10 +232,6 @@ enum InstanceRequest {
232232
state: VmmStateRequested,
233233
tx: oneshot::Sender<Result<VmmPutStateResponse, ManagerError>>,
234234
},
235-
Terminate {
236-
mark_failed: bool,
237-
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
238-
},
239235
IssueSnapshotRequest {
240236
disk_id: Uuid,
241237
snapshot_id: Uuid,
@@ -293,9 +289,6 @@ impl InstanceRequest {
293289
Self::PutState { tx, .. } => tx
294290
.send(Err(error.into()))
295291
.map_err(|_| Error::FailedSendClientClosed),
296-
Self::Terminate { tx, .. } => tx
297-
.send(Err(error.into()))
298-
.map_err(|_| Error::FailedSendClientClosed),
299292
Self::IssueSnapshotRequest { tx, .. }
300293
| Self::AddExternalIp { tx, .. }
301294
| Self::DeleteExternalIp { tx, .. }
@@ -306,6 +299,11 @@ impl InstanceRequest {
306299
}
307300
}
308301

302+
struct TerminateRequest {
303+
mark_failed: bool,
304+
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
305+
}
306+
309307
// A small task which tracks the state of the instance, by constantly querying
310308
// the state of Propolis for updates.
311309
//
@@ -469,7 +467,7 @@ struct InstanceRunner {
469467
}
470468

471469
impl InstanceRunner {
472-
async fn run(mut self) {
470+
async fn run(mut self, mut terminate_rx: mpsc::Receiver<TerminateRequest>) {
473471
use InstanceRequest::*;
474472
while !self.should_terminate {
475473
tokio::select! {
@@ -535,74 +533,103 @@ impl InstanceRunner {
535533
self.terminate(mark_failed).await;
536534
},
537535
}
538-
539536
},
537+
// Requests to terminate the instance take priority over any
538+
// other request to the instance.
539+
request = terminate_rx.recv() => {
540+
self.handle_termination_request(request, None).await;
541+
break;
542+
}
543+
540544
// Handle external requests to act upon the instance.
541545
request = self.rx.recv() => {
542-
let request_variant = request.as_ref().map(|r| r.to_string());
543-
let result = match request {
544-
Some(RequestZoneBundle { tx }) => {
545-
tx.send(self.request_zone_bundle().await)
546-
.map_err(|_| Error::FailedSendClientClosed)
547-
},
548-
Some(GetFilesystemPool { tx } ) => {
549-
tx.send(Ok(self.get_filesystem_zpool()))
550-
.map_err(|_| Error::FailedSendClientClosed)
551-
},
552-
Some(CurrentState{ tx }) => {
553-
tx.send(Ok(self.current_state()))
554-
.map_err(|_| Error::FailedSendClientClosed)
555-
},
556-
Some(PutState{ state, tx }) => {
557-
tx.send(self.put_state(state).await
558-
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
559-
.map_err(|e| e.into()))
560-
.map_err(|_| Error::FailedSendClientClosed)
561-
},
562-
Some(Terminate { mark_failed, tx }) => {
563-
tx.send(Ok(VmmUnregisterResponse {
564-
updated_runtime: Some(self.terminate(mark_failed).await)
565-
}))
566-
.map_err(|_| Error::FailedSendClientClosed)
567-
},
568-
Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => {
569-
tx.send(
570-
self.issue_snapshot_request(
571-
disk_id,
572-
snapshot_id
573-
).await.map_err(|e| e.into())
574-
)
575-
.map_err(|_| Error::FailedSendClientClosed)
576-
},
577-
Some(AddExternalIp { ip, tx }) => {
578-
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
579-
.map_err(|_| Error::FailedSendClientClosed)
580-
},
581-
Some(DeleteExternalIp { ip, tx }) => {
582-
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
583-
.map_err(|_| Error::FailedSendClientClosed)
584-
},
585-
Some(RefreshExternalIps { tx }) => {
586-
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
587-
.map_err(|_| Error::FailedSendClientClosed)
588-
}
546+
let request = match request {
547+
Some(r) => r,
589548
None => {
590549
warn!(self.log, "Instance request channel closed; shutting down");
591550
let mark_failed = false;
592551
self.terminate(mark_failed).await;
593552
break;
594-
},
553+
}
595554
};
555+
let request_variant = request.to_string();
556+
// Okay, this is a little bit wacky: if we are waiting for
557+
// one of the instance operations we run here to come back,
558+
// and a termination request comes in, we want to give up on
559+
// the outstanding operation and honor the termination
560+
// request immediately. This is in case the instance
561+
// operation has gotten stuck: we don't want it to prevent
562+
// the instance from terminating because something else is
563+
// wedged.
564+
//
565+
// Therefore, we're going to select between the future that
566+
// actually performs the instance op and receiving another
567+
// request from the termination channel.
568+
let op = async {
569+
match request {
570+
RequestZoneBundle { tx } => {
571+
tx.send(self.request_zone_bundle().await)
572+
.map_err(|_| Error::FailedSendClientClosed)
573+
},
574+
GetFilesystemPool { tx } => {
575+
tx.send(Ok(self.get_filesystem_zpool()))
576+
.map_err(|_| Error::FailedSendClientClosed)
577+
},
578+
CurrentState{ tx } => {
579+
tx.send(Ok(self.current_state()))
580+
.map_err(|_| Error::FailedSendClientClosed)
581+
},
582+
PutState{ state, tx } => {
583+
tx.send(self.put_state(state).await
584+
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
585+
.map_err(|e| e.into()))
586+
.map_err(|_| Error::FailedSendClientClosed)
587+
},
588+
IssueSnapshotRequest { disk_id, snapshot_id, tx } => {
589+
tx.send(
590+
self.issue_snapshot_request(
591+
disk_id,
592+
snapshot_id
593+
).await.map_err(|e| e.into())
594+
)
595+
.map_err(|_| Error::FailedSendClientClosed)
596+
},
597+
AddExternalIp { ip, tx } => {
598+
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
599+
.map_err(|_| Error::FailedSendClientClosed)
600+
},
601+
DeleteExternalIp { ip, tx } => {
602+
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
603+
.map_err(|_| Error::FailedSendClientClosed)
604+
},
605+
RefreshExternalIps { tx } => {
606+
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
607+
.map_err(|_| Error::FailedSendClientClosed)
608+
}
609+
}
610+
};
611+
tokio::select! {
612+
biased;
613+
614+
request = terminate_rx.recv() => {
615+
self.handle_termination_request(
616+
request,
617+
Some(&request_variant),
618+
).await;
619+
break;
620+
}
596621

597-
if let Err(err) = result {
598-
warn!(
599-
self.log,
600-
"Error handling request";
601-
"request" => request_variant.unwrap(),
602-
"err" => ?err,
603-
604-
);
605-
}
622+
result = op => {
623+
if let Err(err) = result {
624+
warn!(
625+
self.log,
626+
"Error handling request";
627+
"request" => request_variant,
628+
"err" => ?err,
629+
);
630+
}
631+
}
632+
};
606633
}
607634

608635
}
@@ -627,9 +654,6 @@ impl InstanceRunner {
627654
PutState { tx, .. } => {
628655
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
629656
}
630-
Terminate { tx, .. } => {
631-
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
632-
}
633657
IssueSnapshotRequest { tx, .. } => {
634658
tx.send(Err(Error::Terminating.into())).map_err(|_| ())
635659
}
@@ -644,6 +668,15 @@ impl InstanceRunner {
644668
}
645669
};
646670
}
671+
672+
// Anyone else who was trying to ask us to go die will be happy to learn
673+
// that we have now done so!
674+
while let Some(TerminateRequest { tx, .. }) = terminate_rx.recv().await
675+
{
676+
let _ = tx.send(Ok(VmmUnregisterResponse {
677+
updated_runtime: Some(self.current_state()),
678+
}));
679+
}
647680
}
648681

649682
/// Yields this instance's ID.
@@ -1193,6 +1226,12 @@ pub struct Instance {
11931226
/// loop.
11941227
tx: mpsc::Sender<InstanceRequest>,
11951228

1229+
/// Sender for requests to terminate the instance.
1230+
///
1231+
/// These are sent over a separate channel so that they can be prioritized
1232+
/// over all other requests to the instance.
1233+
terminate_tx: mpsc::Sender<TerminateRequest>,
1234+
11961235
/// This is reference-counted so that the `Instance` struct may be cloned.
11971236
#[allow(dead_code)]
11981237
runner_handle: Arc<tokio::task::JoinHandle<()>>,
@@ -1291,6 +1330,19 @@ impl Instance {
12911330
let (tx, rx) = mpsc::channel(QUEUE_SIZE);
12921331
let (tx_monitor, rx_monitor) = mpsc::channel(1);
12931332

1333+
// Request channel for terminating the instance.
1334+
//
1335+
// This is a separate channel from the main request channel (`self.rx`)
1336+
// because we would like to be able to prioritize requests to terminate, and
1337+
// handle them even when the instance's main request channel may have filled
1338+
// up.
1339+
//
1340+
// Note also that this is *not* part of the `InstanceRunner` struct,
1341+
// because it's necessary to split mutable borrows in order to allow
1342+
// selecting between the actual instance operation (which must mutate
1343+
// the `InstanceRunner`) and awaiting a termination request.
1344+
let (terminate_tx, terminate_rx) = mpsc::channel(QUEUE_SIZE);
1345+
12941346
let metadata = propolis_client::types::InstanceMetadata {
12951347
project_id: metadata.project_id,
12961348
silo_id: metadata.silo_id,
@@ -1345,9 +1397,14 @@ impl Instance {
13451397
};
13461398

13471399
let runner_handle =
1348-
tokio::task::spawn(async move { runner.run().await });
1400+
tokio::task::spawn(async move { runner.run(terminate_rx).await });
13491401

1350-
Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) })
1402+
Ok(Instance {
1403+
id,
1404+
tx,
1405+
runner_handle: Arc::new(runner_handle),
1406+
terminate_tx,
1407+
})
13511408
}
13521409

13531410
pub fn id(&self) -> InstanceUuid {
@@ -1410,9 +1467,19 @@ impl Instance {
14101467
tx: oneshot::Sender<Result<VmmUnregisterResponse, ManagerError>>,
14111468
mark_failed: bool,
14121469
) -> Result<(), Error> {
1413-
self.tx
1414-
.try_send(InstanceRequest::Terminate { mark_failed, tx })
1415-
.or_else(InstanceRequest::fail_try_send)
1470+
self.terminate_tx
1471+
.try_send(TerminateRequest { mark_failed, tx })
1472+
.or_else(|err| match err {
1473+
mpsc::error::TrySendError::Closed(TerminateRequest {
1474+
tx,
1475+
..
1476+
}) => tx.send(Err(Error::FailedSendChannelClosed.into())),
1477+
mpsc::error::TrySendError::Full(TerminateRequest {
1478+
tx,
1479+
..
1480+
}) => tx.send(Err(Error::FailedSendChannelFull.into())),
1481+
})
1482+
.map_err(|_| Error::FailedSendClientClosed)
14161483
}
14171484

14181485
pub fn issue_snapshot_request(
@@ -1749,6 +1816,64 @@ impl InstanceRunner {
17491816
Ok(PropolisSetup { client, running_zone })
17501817
}
17511818

1819+
async fn handle_termination_request(
1820+
&mut self,
1821+
req: Option<TerminateRequest>,
1822+
current_req: Option<&str>,
1823+
) {
1824+
match req {
1825+
Some(TerminateRequest { tx, mark_failed }) => {
1826+
if let Some(request) = current_req {
1827+
info!(
1828+
self.log,
1829+
"Received request to terminate instance while waiting \
1830+
on an ongoing request";
1831+
"request" => %request,
1832+
"mark_failed" => mark_failed,
1833+
);
1834+
} else {
1835+
info!(
1836+
self.log,
1837+
"Received request to terminate instance";
1838+
"mark_failed" => mark_failed,
1839+
);
1840+
}
1841+
1842+
let result = tx
1843+
.send(Ok(VmmUnregisterResponse {
1844+
updated_runtime: Some(
1845+
self.terminate(mark_failed).await,
1846+
),
1847+
}))
1848+
.map_err(|_| Error::FailedSendClientClosed);
1849+
if let Err(err) = result {
1850+
warn!(
1851+
self.log,
1852+
"Error handling request to terminate instance";
1853+
"err" => ?err,
1854+
);
1855+
}
1856+
}
1857+
None => {
1858+
if let Some(request) = current_req {
1859+
warn!(
1860+
self.log,
1861+
"Instance termination request channel closed while \
1862+
waiting on an ongoing request; shutting down";
1863+
"request" => %request,
1864+
);
1865+
} else {
1866+
warn!(
1867+
self.log,
1868+
"Instance termination request channel closed; \
1869+
shutting down";
1870+
);
1871+
}
1872+
self.terminate(false).await;
1873+
}
1874+
};
1875+
}
1876+
17521877
async fn terminate(&mut self, mark_failed: bool) -> SledVmmState {
17531878
self.terminate_inner().await;
17541879
self.state.terminate_rudely(mark_failed);

0 commit comments

Comments
 (0)