Skip to content

Commit

Permalink
Shutdown utilization task when stopping topology
Browse files Browse the repository at this point in the history
  • Loading branch information
esensar committed Jan 17, 2025
1 parent d7d8694 commit db6f273
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/topology/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
spawn_named,
};
use futures::{future, Future, FutureExt};
use stream_cancel::Trigger;
use tokio::{
sync::{mpsc, watch},
time::{interval, sleep_until, Duration, Instant},
Expand Down Expand Up @@ -49,6 +50,7 @@ pub struct RunningTopology {
pub(crate) running: Arc<AtomicBool>,
graceful_shutdown_duration: Option<Duration>,
utilization_task: Option<TaskHandle>,
utilization_task_shutdown_trigger: Option<Trigger>,
}

impl RunningTopology {
Expand All @@ -68,6 +70,7 @@ impl RunningTopology {
graceful_shutdown_duration: config.graceful_shutdown_duration,
config,
utilization_task: None,
utilization_task_shutdown_trigger: None,
}
}

Expand Down Expand Up @@ -208,6 +211,9 @@ impl RunningTopology {

// Now kick off the shutdown process by shutting down the sources.
let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
if let Some(trigger) = self.utilization_task_shutdown_trigger {
trigger.cancel();
}

futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
}
Expand Down Expand Up @@ -1061,12 +1067,16 @@ impl RunningTopology {
running_topology.connect_diff(&diff, &mut pieces).await;
running_topology.spawn_diff(&diff, pieces);

let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
ShutdownSignal::new_wired();
running_topology.utilization_task_shutdown_trigger =
Some(utilization_task_shutdown_trigger);
running_topology.utilization_task = Some(tokio::spawn(Task::new(
"utilization_heartbeat".into(),
"",
async move {
utilization_emitter
.run_utilization(ShutdownSignal::noop())
.run_utilization(utilization_shutdown_signal)
.await;
// TODO: new task output type for this? Or handle this task in a completely
// different way
Expand Down

0 comments on commit db6f273

Please sign in to comment.