Skip to content

Commit 2a331ab

Browse files
committed
allow termination to interrupt stuck instance ops
1 parent 068b17a commit 2a331ab

File tree

1 file changed

+122
-65
lines changed

1 file changed

+122
-65
lines changed

sled-agent/src/instance.rs

+122-65
Original file line numberDiff line numberDiff line change
@@ -409,14 +409,6 @@ struct InstanceRunner {
409409
// Request channel on which most instance requests are made.
410410
rx: mpsc::Receiver<InstanceRequest>,
411411

412-
// Request channel for terminating the instance.
413-
//
414-
// This is a separate channel from the main request channel (`self.rx`)
415-
// because we would like to be able to prioritize requests to terminate, and
416-
// handle them even when the instance's main request channel may have filled
417-
// up.
418-
terminate_rx: mpsc::Receiver<TerminateRequest>,
419-
420412
// Request channel on which monitor requests are made.
421413
tx_monitor: mpsc::Sender<InstanceMonitorRequest>,
422414
rx_monitor: mpsc::Receiver<InstanceMonitorRequest>,
@@ -475,7 +467,7 @@ struct InstanceRunner {
475467
}
476468

477469
impl InstanceRunner {
478-
async fn run(mut self) {
470+
async fn run(mut self, mut terminate_rx: mpsc::Receiver<TerminateRequest>) {
479471
use InstanceRequest::*;
480472
while !self.should_terminate {
481473
tokio::select! {
@@ -544,7 +536,7 @@ impl InstanceRunner {
544536
},
545537
// Requests to terminate the instance take priority over any
546538
// other request to the instance.
547-
request = self.terminate_rx.recv() => {
539+
request = terminate_rx.recv() => {
548540
let Some(TerminateRequest { mark_failed, tx}) = request else {
549541
warn!(
550542
self.log,
@@ -569,64 +561,119 @@ impl InstanceRunner {
569561

570562
// Handle external requests to act upon the instance.
571563
request = self.rx.recv() => {
572-
let request_variant = request.as_ref().map(|r| r.to_string());
573-
let result = match request {
574-
Some(RequestZoneBundle { tx }) => {
575-
tx.send(self.request_zone_bundle().await)
576-
.map_err(|_| Error::FailedSendClientClosed)
577-
},
578-
Some(GetFilesystemPool { tx } ) => {
579-
tx.send(Ok(self.get_filesystem_zpool()))
580-
.map_err(|_| Error::FailedSendClientClosed)
581-
},
582-
Some(CurrentState{ tx }) => {
583-
tx.send(Ok(self.current_state()))
584-
.map_err(|_| Error::FailedSendClientClosed)
585-
},
586-
Some(PutState{ state, tx }) => {
587-
tx.send(self.put_state(state).await
588-
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
589-
.map_err(|e| e.into()))
590-
.map_err(|_| Error::FailedSendClientClosed)
591-
},
592-
Some(IssueSnapshotRequest { disk_id, snapshot_id, tx }) => {
593-
tx.send(
594-
self.issue_snapshot_request(
595-
disk_id,
596-
snapshot_id
597-
).await.map_err(|e| e.into())
598-
)
599-
.map_err(|_| Error::FailedSendClientClosed)
600-
},
601-
Some(AddExternalIp { ip, tx }) => {
602-
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
603-
.map_err(|_| Error::FailedSendClientClosed)
604-
},
605-
Some(DeleteExternalIp { ip, tx }) => {
606-
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
607-
.map_err(|_| Error::FailedSendClientClosed)
608-
},
609-
Some(RefreshExternalIps { tx }) => {
610-
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
611-
.map_err(|_| Error::FailedSendClientClosed)
612-
}
564+
let request = match request {
565+
Some(r) => r,
613566
None => {
614567
warn!(self.log, "Instance request channel closed; shutting down");
615568
let mark_failed = false;
616569
self.terminate(mark_failed).await;
617570
break;
618-
},
571+
}
619572
};
573+
let request_variant = request.to_string();
574+
// Okay, this is a little bit wacky: if we are waiting for
575+
// one of the instance operations we run here to come back,
576+
// and a termination request comes in, we want to give up on
577+
// the outstanding operation and honor the termination
578+
// request immediately. This is in case the instance
579+
// operation has gotten stuck: we don't want it to prevent
580+
// the instance from terminating because something else is
581+
// wedged.
582+
//
583+
// Therefore, we're going to select between the future that
584+
// actually performs the instance op and receiving another
585+
// request from the termination channel.
586+
let op = async {
587+
match request {
588+
RequestZoneBundle { tx } => {
589+
tx.send(self.request_zone_bundle().await)
590+
.map_err(|_| Error::FailedSendClientClosed)
591+
},
592+
GetFilesystemPool { tx } => {
593+
tx.send(Ok(self.get_filesystem_zpool()))
594+
.map_err(|_| Error::FailedSendClientClosed)
595+
},
596+
CurrentState{ tx } => {
597+
tx.send(Ok(self.current_state()))
598+
.map_err(|_| Error::FailedSendClientClosed)
599+
},
600+
PutState{ state, tx } => {
601+
tx.send(self.put_state(state).await
602+
.map(|r| VmmPutStateResponse { updated_runtime: Some(r) })
603+
.map_err(|e| e.into()))
604+
.map_err(|_| Error::FailedSendClientClosed)
605+
},
606+
IssueSnapshotRequest { disk_id, snapshot_id, tx } => {
607+
tx.send(
608+
self.issue_snapshot_request(
609+
disk_id,
610+
snapshot_id
611+
).await.map_err(|e| e.into())
612+
)
613+
.map_err(|_| Error::FailedSendClientClosed)
614+
},
615+
AddExternalIp { ip, tx } => {
616+
tx.send(self.add_external_ip(&ip).await.map_err(|e| e.into()))
617+
.map_err(|_| Error::FailedSendClientClosed)
618+
},
619+
DeleteExternalIp { ip, tx } => {
620+
tx.send(self.delete_external_ip(&ip).await.map_err(|e| e.into()))
621+
.map_err(|_| Error::FailedSendClientClosed)
622+
},
623+
RefreshExternalIps { tx } => {
624+
tx.send(self.refresh_external_ips().map_err(|e| e.into()))
625+
.map_err(|_| Error::FailedSendClientClosed)
626+
}
627+
}
628+
};
629+
tokio::select! {
630+
biased;
631+
632+
request = terminate_rx.recv() => {
633+
match request {
634+
Some(TerminateRequest { tx, mark_failed }) => {
635+
info!(
636+
self.log,
637+
"Received request to terminate instance \
638+
while waiting on an ongoing request";
639+
"request" => request_variant,
640+
);
641+
let result = tx.send(Ok(VmmUnregisterResponse {
642+
updated_runtime: Some(self.terminate(mark_failed).await)
643+
}))
644+
.map_err(|_| Error::FailedSendClientClosed);
645+
if let Err(err) = result {
646+
warn!(
647+
self.log,
648+
"Error handling request to terminate instance";
649+
"err" => ?err,
650+
);
651+
}
652+
break;
653+
},
654+
None => {
655+
warn!(
656+
self.log,
657+
"Instance termination request channel closed; \
658+
shutting down";
659+
);
660+
self.terminate(false).await;
661+
break;
662+
},
663+
};
664+
}
620665

621-
if let Err(err) = result {
622-
warn!(
623-
self.log,
624-
"Error handling request";
625-
"request" => request_variant.unwrap(),
626-
"err" => ?err,
627-
628-
);
629-
}
666+
result = op => {
667+
if let Err(err) = result {
668+
warn!(
669+
self.log,
670+
"Error handling request";
671+
"request" => request_variant,
672+
"err" => ?err,
673+
);
674+
}
675+
}
676+
};
630677
}
631678

632679
}
@@ -668,8 +715,7 @@ impl InstanceRunner {
668715

669716
// Anyone else who was trying to ask us to go die will be happy to learn
670717
// that we have now done so!
671-
while let Some(TerminateRequest { tx, .. }) =
672-
self.terminate_rx.recv().await
718+
while let Some(TerminateRequest { tx, .. }) = terminate_rx.recv().await
673719
{
674720
let _ = tx.send(Ok(VmmUnregisterResponse {
675721
updated_runtime: Some(self.current_state()),
@@ -1323,6 +1369,18 @@ impl Instance {
13231369

13241370
let (tx, rx) = mpsc::channel(QUEUE_SIZE);
13251371
let (tx_monitor, rx_monitor) = mpsc::channel(1);
1372+
1373+
// Request channel for terminating the instance.
1374+
//
1375+
// This is a separate channel from the main request channel (`self.rx`)
1376+
// because we would like to be able to prioritize requests to terminate, and
1377+
// handle them even when the instance's main request channel may have filled
1378+
// up.
1379+
//
1380+
// Note also that this is *not* part of the `InstanceRunner` struct,
1381+
// because it's necessary to split mutable borrows in order to allow
1382+
// selecting between the actual instance operation (which must mutate
1383+
// the `InstanceRunner`) and awaiting a termination request.
13261384
let (terminate_tx, terminate_rx) = mpsc::channel(QUEUE_SIZE);
13271385

13281386
let metadata = propolis_client::types::InstanceMetadata {
@@ -1340,7 +1398,6 @@ impl Instance {
13401398
rx,
13411399
tx_monitor,
13421400
rx_monitor,
1343-
terminate_rx,
13441401
monitor_handle: None,
13451402
// NOTE: Mostly lies.
13461403
properties: propolis_client::types::InstanceProperties {
@@ -1380,7 +1437,7 @@ impl Instance {
13801437
};
13811438

13821439
let runner_handle =
1383-
tokio::task::spawn(async move { runner.run().await });
1440+
tokio::task::spawn(async move { runner.run(terminate_rx).await });
13841441

13851442
Ok(Instance {
13861443
id,

0 commit comments

Comments
 (0)