@@ -409,14 +409,6 @@ struct InstanceRunner {
409
409
// Request channel on which most instance requests are made.
410
410
rx : mpsc:: Receiver < InstanceRequest > ,
411
411
412
- // Request channel for terminating the instance.
413
- //
414
- // This is a separate channel from the main request channel (`self.rx`)
415
- // because we would like to be able to prioritize requests to terminate, and
416
- // handle them even when the instance's main request channel may have filled
417
- // up.
418
- terminate_rx : mpsc:: Receiver < TerminateRequest > ,
419
-
420
412
// Request channel on which monitor requests are made.
421
413
tx_monitor : mpsc:: Sender < InstanceMonitorRequest > ,
422
414
rx_monitor : mpsc:: Receiver < InstanceMonitorRequest > ,
@@ -475,7 +467,7 @@ struct InstanceRunner {
475
467
}
476
468
477
469
impl InstanceRunner {
478
- async fn run ( mut self ) {
470
+ async fn run ( mut self , mut terminate_rx : mpsc :: Receiver < TerminateRequest > ) {
479
471
use InstanceRequest :: * ;
480
472
while !self . should_terminate {
481
473
tokio:: select! {
@@ -544,7 +536,7 @@ impl InstanceRunner {
544
536
} ,
545
537
// Requests to terminate the instance take priority over any
546
538
// other request to the instance.
547
- request = self . terminate_rx. recv( ) => {
539
+ request = terminate_rx. recv( ) => {
548
540
let Some ( TerminateRequest { mark_failed, tx} ) = request else {
549
541
warn!(
550
542
self . log,
@@ -569,64 +561,119 @@ impl InstanceRunner {
569
561
570
562
// Handle external requests to act upon the instance.
571
563
request = self . rx. recv( ) => {
572
- let request_variant = request. as_ref( ) . map( |r| r. to_string( ) ) ;
573
- let result = match request {
574
- Some ( RequestZoneBundle { tx } ) => {
575
- tx. send( self . request_zone_bundle( ) . await )
576
- . map_err( |_| Error :: FailedSendClientClosed )
577
- } ,
578
- Some ( GetFilesystemPool { tx } ) => {
579
- tx. send( Ok ( self . get_filesystem_zpool( ) ) )
580
- . map_err( |_| Error :: FailedSendClientClosed )
581
- } ,
582
- Some ( CurrentState { tx } ) => {
583
- tx. send( Ok ( self . current_state( ) ) )
584
- . map_err( |_| Error :: FailedSendClientClosed )
585
- } ,
586
- Some ( PutState { state, tx } ) => {
587
- tx. send( self . put_state( state) . await
588
- . map( |r| VmmPutStateResponse { updated_runtime: Some ( r) } )
589
- . map_err( |e| e. into( ) ) )
590
- . map_err( |_| Error :: FailedSendClientClosed )
591
- } ,
592
- Some ( IssueSnapshotRequest { disk_id, snapshot_id, tx } ) => {
593
- tx. send(
594
- self . issue_snapshot_request(
595
- disk_id,
596
- snapshot_id
597
- ) . await . map_err( |e| e. into( ) )
598
- )
599
- . map_err( |_| Error :: FailedSendClientClosed )
600
- } ,
601
- Some ( AddExternalIp { ip, tx } ) => {
602
- tx. send( self . add_external_ip( & ip) . await . map_err( |e| e. into( ) ) )
603
- . map_err( |_| Error :: FailedSendClientClosed )
604
- } ,
605
- Some ( DeleteExternalIp { ip, tx } ) => {
606
- tx. send( self . delete_external_ip( & ip) . await . map_err( |e| e. into( ) ) )
607
- . map_err( |_| Error :: FailedSendClientClosed )
608
- } ,
609
- Some ( RefreshExternalIps { tx } ) => {
610
- tx. send( self . refresh_external_ips( ) . map_err( |e| e. into( ) ) )
611
- . map_err( |_| Error :: FailedSendClientClosed )
612
- }
564
+ let request = match request {
565
+ Some ( r) => r,
613
566
None => {
614
567
warn!( self . log, "Instance request channel closed; shutting down" ) ;
615
568
let mark_failed = false ;
616
569
self . terminate( mark_failed) . await ;
617
570
break ;
618
- } ,
571
+ }
619
572
} ;
573
+ let request_variant = request. to_string( ) ;
574
+ // Okay, this is a little bit wacky: if we are waiting for
575
+ // one of the instance operations we run here to come back,
576
+ // and a termination request comes in, we want to give up on
577
+ // the outstanding operation and honor the termination
578
+ // request immediately. This is in case the instance
579
+ // operation has gotten stuck: we don't want it to prevent
580
+ // the instance from terminating because something else is
581
+ // wedged.
582
+ //
583
+ // Therefore, we're going to select between the future that
584
+ // actually performs the instance op and receiving another
585
+ // request from the termination channel.
586
+ let op = async {
587
+ match request {
588
+ RequestZoneBundle { tx } => {
589
+ tx. send( self . request_zone_bundle( ) . await )
590
+ . map_err( |_| Error :: FailedSendClientClosed )
591
+ } ,
592
+ GetFilesystemPool { tx } => {
593
+ tx. send( Ok ( self . get_filesystem_zpool( ) ) )
594
+ . map_err( |_| Error :: FailedSendClientClosed )
595
+ } ,
596
+ CurrentState { tx } => {
597
+ tx. send( Ok ( self . current_state( ) ) )
598
+ . map_err( |_| Error :: FailedSendClientClosed )
599
+ } ,
600
+ PutState { state, tx } => {
601
+ tx. send( self . put_state( state) . await
602
+ . map( |r| VmmPutStateResponse { updated_runtime: Some ( r) } )
603
+ . map_err( |e| e. into( ) ) )
604
+ . map_err( |_| Error :: FailedSendClientClosed )
605
+ } ,
606
+ IssueSnapshotRequest { disk_id, snapshot_id, tx } => {
607
+ tx. send(
608
+ self . issue_snapshot_request(
609
+ disk_id,
610
+ snapshot_id
611
+ ) . await . map_err( |e| e. into( ) )
612
+ )
613
+ . map_err( |_| Error :: FailedSendClientClosed )
614
+ } ,
615
+ AddExternalIp { ip, tx } => {
616
+ tx. send( self . add_external_ip( & ip) . await . map_err( |e| e. into( ) ) )
617
+ . map_err( |_| Error :: FailedSendClientClosed )
618
+ } ,
619
+ DeleteExternalIp { ip, tx } => {
620
+ tx. send( self . delete_external_ip( & ip) . await . map_err( |e| e. into( ) ) )
621
+ . map_err( |_| Error :: FailedSendClientClosed )
622
+ } ,
623
+ RefreshExternalIps { tx } => {
624
+ tx. send( self . refresh_external_ips( ) . map_err( |e| e. into( ) ) )
625
+ . map_err( |_| Error :: FailedSendClientClosed )
626
+ }
627
+ }
628
+ } ;
629
+ tokio:: select! {
630
+ biased;
631
+
632
+ request = terminate_rx. recv( ) => {
633
+ match request {
634
+ Some ( TerminateRequest { tx, mark_failed } ) => {
635
+ info!(
636
+ self . log,
637
+ "Received request to terminate instance \
638
+ while waiting on an ongoing request";
639
+ "request" => request_variant,
640
+ ) ;
641
+ let result = tx. send( Ok ( VmmUnregisterResponse {
642
+ updated_runtime: Some ( self . terminate( mark_failed) . await )
643
+ } ) )
644
+ . map_err( |_| Error :: FailedSendClientClosed ) ;
645
+ if let Err ( err) = result {
646
+ warn!(
647
+ self . log,
648
+ "Error handling request to terminate instance" ;
649
+ "err" => ?err,
650
+ ) ;
651
+ }
652
+ break ;
653
+ } ,
654
+ None => {
655
+ warn!(
656
+ self . log,
657
+ "Instance termination request channel closed; \
658
+ shutting down";
659
+ ) ;
660
+ self . terminate( false ) . await ;
661
+ break ;
662
+ } ,
663
+ } ;
664
+ }
620
665
621
- if let Err ( err) = result {
622
- warn!(
623
- self . log,
624
- "Error handling request" ;
625
- "request" => request_variant. unwrap( ) ,
626
- "err" => ?err,
627
-
628
- ) ;
629
- }
666
+ result = op => {
667
+ if let Err ( err) = result {
668
+ warn!(
669
+ self . log,
670
+ "Error handling request" ;
671
+ "request" => request_variant,
672
+ "err" => ?err,
673
+ ) ;
674
+ }
675
+ }
676
+ } ;
630
677
}
631
678
632
679
}
@@ -668,8 +715,7 @@ impl InstanceRunner {
668
715
669
716
// Anyone else who was trying to ask us to go die will be happy to learn
670
717
// that we have now done so!
671
- while let Some ( TerminateRequest { tx, .. } ) =
672
- self . terminate_rx . recv ( ) . await
718
+ while let Some ( TerminateRequest { tx, .. } ) = terminate_rx. recv ( ) . await
673
719
{
674
720
let _ = tx. send ( Ok ( VmmUnregisterResponse {
675
721
updated_runtime : Some ( self . current_state ( ) ) ,
@@ -1323,6 +1369,18 @@ impl Instance {
1323
1369
1324
1370
let ( tx, rx) = mpsc:: channel ( QUEUE_SIZE ) ;
1325
1371
let ( tx_monitor, rx_monitor) = mpsc:: channel ( 1 ) ;
1372
+
1373
+ // Request channel for terminating the instance.
1374
+ //
1375
+ // This is a separate channel from the main request channel (`self.rx`)
1376
+ // because we would like to be able to prioritize requests to terminate, and
1377
+ // handle them even when the instance's main request channel may have filled
1378
+ // up.
1379
+ //
1380
+ // Note also that this is *not* part of the `InstanceRunner` struct,
1381
+ // because it's necessary to split mutable borrows in order to allow
1382
+ // selecting between the actual instance operation (which must mutate
1383
+ // the `InstanceRunner`) and awaiting a termination request.
1326
1384
let ( terminate_tx, terminate_rx) = mpsc:: channel ( QUEUE_SIZE ) ;
1327
1385
1328
1386
let metadata = propolis_client:: types:: InstanceMetadata {
@@ -1340,7 +1398,6 @@ impl Instance {
1340
1398
rx,
1341
1399
tx_monitor,
1342
1400
rx_monitor,
1343
- terminate_rx,
1344
1401
monitor_handle : None ,
1345
1402
// NOTE: Mostly lies.
1346
1403
properties : propolis_client:: types:: InstanceProperties {
@@ -1380,7 +1437,7 @@ impl Instance {
1380
1437
} ;
1381
1438
1382
1439
let runner_handle =
1383
- tokio:: task:: spawn ( async move { runner. run ( ) . await } ) ;
1440
+ tokio:: task:: spawn ( async move { runner. run ( terminate_rx ) . await } ) ;
1384
1441
1385
1442
Ok ( Instance {
1386
1443
id,
0 commit comments