@@ -232,10 +232,6 @@ enum InstanceRequest {
232
232
state : VmmStateRequested ,
233
233
tx : oneshot:: Sender < Result < VmmPutStateResponse , ManagerError > > ,
234
234
} ,
235
- Terminate {
236
- mark_failed : bool ,
237
- tx : oneshot:: Sender < Result < VmmUnregisterResponse , ManagerError > > ,
238
- } ,
239
235
IssueSnapshotRequest {
240
236
disk_id : Uuid ,
241
237
snapshot_id : Uuid ,
@@ -293,9 +289,6 @@ impl InstanceRequest {
293
289
Self :: PutState { tx, .. } => tx
294
290
. send ( Err ( error. into ( ) ) )
295
291
. map_err ( |_| Error :: FailedSendClientClosed ) ,
296
- Self :: Terminate { tx, .. } => tx
297
- . send ( Err ( error. into ( ) ) )
298
- . map_err ( |_| Error :: FailedSendClientClosed ) ,
299
292
Self :: IssueSnapshotRequest { tx, .. }
300
293
| Self :: AddExternalIp { tx, .. }
301
294
| Self :: DeleteExternalIp { tx, .. }
@@ -306,6 +299,11 @@ impl InstanceRequest {
306
299
}
307
300
}
308
301
302
+ struct TerminateRequest {
303
+ mark_failed : bool ,
304
+ tx : oneshot:: Sender < Result < VmmUnregisterResponse , ManagerError > > ,
305
+ }
306
+
309
307
// A small task which tracks the state of the instance, by constantly querying
310
308
// the state of Propolis for updates.
311
309
//
@@ -411,6 +409,14 @@ struct InstanceRunner {
411
409
// Request channel on which most instance requests are made.
412
410
rx : mpsc:: Receiver < InstanceRequest > ,
413
411
412
+ // Request channel for terminating the instance.
413
+ //
414
+ // This is a separate channel from the main request channel (`self.rx`)
415
+ // because we would like to be able to prioritize requests to terminate, and
416
+ // handle them even when the instance's main request channel may have filled
417
+ // up.
418
+ terminate_rx : mpsc:: Receiver < TerminateRequest > ,
419
+
414
420
// Request channel on which monitor requests are made.
415
421
tx_monitor : mpsc:: Sender < InstanceMonitorRequest > ,
416
422
rx_monitor : mpsc:: Receiver < InstanceMonitorRequest > ,
@@ -535,8 +541,32 @@ impl InstanceRunner {
535
541
self . terminate( mark_failed) . await ;
536
542
} ,
537
543
}
538
-
539
544
} ,
545
+ // Requests to terminate the instance take priority over any
546
+ // other request to the instance.
547
+ request = self . terminate_rx. recv( ) => {
548
+ let Some ( TerminateRequest { mark_failed, tx} ) = request else {
549
+ warn!(
550
+ self . log,
551
+ "Instance termination request channel closed; \
552
+ shutting down",
553
+ ) ;
554
+ self . terminate( false ) . await ;
555
+ break ;
556
+ } ;
557
+ let result = tx. send( Ok ( VmmUnregisterResponse {
558
+ updated_runtime: Some ( self . terminate( mark_failed) . await )
559
+ } ) )
560
+ . map_err( |_| Error :: FailedSendClientClosed ) ;
561
+ if let Err ( err) = result {
562
+ warn!(
563
+ self . log,
564
+ "Error handling request to terminate instance" ;
565
+ "err" => ?err,
566
+ ) ;
567
+ }
568
+ }
569
+
540
570
// Handle external requests to act upon the instance.
541
571
request = self . rx. recv( ) => {
542
572
let request_variant = request. as_ref( ) . map( |r| r. to_string( ) ) ;
@@ -559,12 +589,6 @@ impl InstanceRunner {
559
589
. map_err( |e| e. into( ) ) )
560
590
. map_err( |_| Error :: FailedSendClientClosed )
561
591
} ,
562
- Some ( Terminate { mark_failed, tx } ) => {
563
- tx. send( Ok ( VmmUnregisterResponse {
564
- updated_runtime: Some ( self . terminate( mark_failed) . await )
565
- } ) )
566
- . map_err( |_| Error :: FailedSendClientClosed )
567
- } ,
568
592
Some ( IssueSnapshotRequest { disk_id, snapshot_id, tx } ) => {
569
593
tx. send(
570
594
self . issue_snapshot_request(
@@ -627,9 +651,6 @@ impl InstanceRunner {
627
651
PutState { tx, .. } => {
628
652
tx. send ( Err ( Error :: Terminating . into ( ) ) ) . map_err ( |_| ( ) )
629
653
}
630
- Terminate { tx, .. } => {
631
- tx. send ( Err ( Error :: Terminating . into ( ) ) ) . map_err ( |_| ( ) )
632
- }
633
654
IssueSnapshotRequest { tx, .. } => {
634
655
tx. send ( Err ( Error :: Terminating . into ( ) ) ) . map_err ( |_| ( ) )
635
656
}
@@ -644,6 +665,16 @@ impl InstanceRunner {
644
665
}
645
666
} ;
646
667
}
668
+
669
+ // Anyone else who was trying to ask us to go die will be happy to learn
670
+ // that we have now done so!
671
+ while let Some ( TerminateRequest { tx, .. } ) =
672
+ self . terminate_rx . recv ( ) . await
673
+ {
674
+ let _ = tx. send ( Ok ( VmmUnregisterResponse {
675
+ updated_runtime : Some ( self . current_state ( ) ) ,
676
+ } ) ) ;
677
+ }
647
678
}
648
679
649
680
/// Yields this instance's ID.
@@ -1189,6 +1220,12 @@ pub struct Instance {
1189
1220
/// loop.
1190
1221
tx : mpsc:: Sender < InstanceRequest > ,
1191
1222
1223
+ /// Sender for requests to terminate the instance.
1224
+ ///
1225
+ /// These are sent over a separate channel so that they can be prioritized
1226
+ /// over all other requests to the instance.
1227
+ terminate_tx : mpsc:: Sender < TerminateRequest > ,
1228
+
1192
1229
/// This is reference
1193
1230
#[ allow( dead_code) ]
1194
1231
runner_handle : Arc < tokio:: task:: JoinHandle < ( ) > > ,
@@ -1286,6 +1323,7 @@ impl Instance {
1286
1323
1287
1324
let ( tx, rx) = mpsc:: channel ( QUEUE_SIZE ) ;
1288
1325
let ( tx_monitor, rx_monitor) = mpsc:: channel ( 1 ) ;
1326
+ let ( terminate_tx, terminate_rx) = mpsc:: channel ( QUEUE_SIZE ) ;
1289
1327
1290
1328
let metadata = propolis_client:: types:: InstanceMetadata {
1291
1329
project_id : metadata. project_id ,
@@ -1302,6 +1340,7 @@ impl Instance {
1302
1340
rx,
1303
1341
tx_monitor,
1304
1342
rx_monitor,
1343
+ terminate_rx,
1305
1344
monitor_handle : None ,
1306
1345
// NOTE: Mostly lies.
1307
1346
properties : propolis_client:: types:: InstanceProperties {
@@ -1343,7 +1382,12 @@ impl Instance {
1343
1382
let runner_handle =
1344
1383
tokio:: task:: spawn ( async move { runner. run ( ) . await } ) ;
1345
1384
1346
- Ok ( Instance { id, tx, runner_handle : Arc :: new ( runner_handle) } )
1385
+ Ok ( Instance {
1386
+ id,
1387
+ tx,
1388
+ runner_handle : Arc :: new ( runner_handle) ,
1389
+ terminate_tx,
1390
+ } )
1347
1391
}
1348
1392
1349
1393
pub fn id ( & self ) -> InstanceUuid {
@@ -1406,9 +1450,19 @@ impl Instance {
1406
1450
tx : oneshot:: Sender < Result < VmmUnregisterResponse , ManagerError > > ,
1407
1451
mark_failed : bool ,
1408
1452
) -> Result < ( ) , Error > {
1409
- self . tx
1410
- . try_send ( InstanceRequest :: Terminate { mark_failed, tx } )
1411
- . or_else ( InstanceRequest :: fail_try_send)
1453
+ self . terminate_tx
1454
+ . try_send ( TerminateRequest { mark_failed, tx } )
1455
+ . or_else ( |err| match err {
1456
+ mpsc:: error:: TrySendError :: Closed ( TerminateRequest {
1457
+ tx,
1458
+ ..
1459
+ } ) => tx. send ( Err ( Error :: FailedSendChannelClosed . into ( ) ) ) ,
1460
+ mpsc:: error:: TrySendError :: Full ( TerminateRequest {
1461
+ tx,
1462
+ ..
1463
+ } ) => tx. send ( Err ( Error :: FailedSendChannelFull . into ( ) ) ) ,
1464
+ } )
1465
+ . map_err ( |_| Error :: FailedSendClientClosed )
1412
1466
}
1413
1467
1414
1468
pub fn issue_snapshot_request (
0 commit comments