From 95886f87229f7e8d82f9057c243b4d48aeaa9acf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 19 Dec 2024 22:48:50 +0100 Subject: [PATCH 1/6] fix(utilization_metric): run a separate task for utilization to ensure it is regularly published This adds a separate task that runs periodically to emit utilization metrics and collect messages from components that need their utilization metrics calculated. This ensures that utilization metric is published even when no events are running through a component. Fixes: #20216 --- src/topology/builder.rs | 99 ++++++++++++++++++----- src/topology/running.rs | 21 ++++- src/utilization.rs | 172 +++++++++++++++++++++++++++++----------- 3 files changed, 222 insertions(+), 70 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8602e5cce6c13..fe9ffb19542d6 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -8,6 +8,7 @@ use std::{ use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt}; use futures_util::stream::FuturesUnordered; +use metrics::gauge; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, @@ -51,7 +52,7 @@ use crate::{ spawn_named, topology::task::TaskError, transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf}, - utilization::wrap, + utilization::{wrap, UtilizationEmitter, UtilizationTimerMessage}, SourceSender, }; @@ -84,6 +85,7 @@ struct Builder<'a> { healthchecks: HashMap, detach_triggers: HashMap, extra_context: ExtraContext, + utilization_emitter: UtilizationEmitter, } impl<'a> Builder<'a> { @@ -105,6 +107,7 @@ impl<'a> Builder<'a> { healthchecks: HashMap::new(), detach_triggers: HashMap::new(), extra_context, + utilization_emitter: UtilizationEmitter::new(), } } @@ -128,6 +131,7 @@ impl<'a> Builder<'a> { healthchecks: self.healthchecks, shutdown_coordinator: self.shutdown_coordinator, detach_triggers: self.detach_triggers, + utilization_emitter: Some(self.utilization_emitter), }) } else { Err(self.errors) @@ -497,7 +501,7 @@ impl<'a> Builder<'a> { let (transform_task, transform_outputs) = { let _span = span.enter(); - build_transform(transform, node, input_rx) + build_transform(transform, node, input_rx, &mut self.utilization_emitter) }; self.outputs.extend(transform_outputs); @@ -506,6 +510,7 @@ impl<'a> Builder<'a> { } async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) { + let utilization_sender = self.utilization_emitter.get_sender(); for (key, sink) in self .config .sinks() @@ -585,6 +590,10 @@ impl<'a> Builder<'a> { let (trigger, tripwire) = Tripwire::new(); + self.utilization_emitter + .add_component(key.clone(), gauge!("utilization")); + let utilization_sender = utilization_sender.clone(); + let component_key = key.clone(); let sink = async move { debug!("Sink starting."); @@ -600,7 +609,7 @@ impl<'a> Builder<'a> { .take() .expect("Task started but input has been taken."); - let mut rx = wrap(rx); + let mut rx = wrap(utilization_sender, component_key.clone(), rx); let events_received = register!(EventsReceived); sink.run( @@ -682,6 +691,7 @@ pub struct TopologyPieces { pub(super) healthchecks: HashMap, pub(crate) shutdown_coordinator: SourceShutdownCoordinator, pub(crate) detach_triggers: HashMap, + pub(crate) utilization_emitter: Option, } impl TopologyPieces { @@ -760,11 +770,14 @@ fn build_transform( transform: Transform, node: TransformNode, input_rx: BufferReceiver, + utilization_emitter: &mut UtilizationEmitter, ) -> (Task, HashMap) { match transform { // TODO: avoid the double boxing for function transforms here - Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx), - Transform::Synchronous(t) => build_sync_transform(t, node, input_rx), + Transform::Function(t) => { + build_sync_transform(Box::new(t), node, input_rx, utilization_emitter) + } + Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter), Transform::Task(t) => build_task_transform( t, input_rx, @@ -772,6 +785,7 @@ fn build_transform( node.typetag, &node.key, &node.outputs, + utilization_emitter, ), } } @@ -780,10 +794,19 @@ fn build_sync_transform( t: Box, node: TransformNode, input_rx: BufferReceiver, + utilization_emitter: &mut UtilizationEmitter, ) -> (Task, HashMap) { let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key); - let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); + utilization_emitter.add_component(node.key.clone(), gauge!("utilization")); + let runner = Runner::new( + t, + input_rx, + utilization_emitter.get_sender(), + node.key.clone(), + node.input_details.data_type(), + outputs, + ); let transform = if node.enable_concurrency { runner.run_concurrently().boxed() } else { @@ -823,8 +846,8 @@ struct Runner { input_rx: Option>, input_type: DataType, outputs: TransformOutputs, - timer: crate::utilization::Timer, - last_report: Instant, + key: ComponentKey, + timer_tx: UnboundedSender, events_received: Registered, } @@ -832,6 +855,8 @@ impl Runner { fn new( transform: Box, input_rx: BufferReceiver, + timer_tx: UnboundedSender, + key: ComponentKey, input_type: DataType, outputs: TransformOutputs, ) -> Self { @@ -840,17 +865,22 @@ impl Runner { input_rx: Some(input_rx), input_type, outputs, - timer: crate::utilization::Timer::new(), - last_report: Instant::now(), + key, + timer_tx, events_received: register!(EventsReceived), } } fn on_events_received(&mut self, events: &EventArray) { - let stopped = self.timer.stop_wait(); - if stopped.duration_since(self.last_report).as_secs() >= 5 { - self.timer.report(); - self.last_report = stopped; + if self + .timer_tx + .send(UtilizationTimerMessage::StopWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization stop wait message from sync transform."); } self.events_received.emit(CountByteSize( @@ -860,7 +890,16 @@ impl Runner { } async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> { - self.timer.start_wait(); + if self + .timer_tx + .send(UtilizationTimerMessage::StartWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); + } self.outputs.send(outputs_buf).await } @@ -877,7 +916,16 @@ impl Runner { .into_stream() .filter(move |events| ready(filter_events_type(events, self.input_type))); - self.timer.start_wait(); + if self + .timer_tx + .send(UtilizationTimerMessage::StartWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); + } while let Some(events) = input_rx.next().await { self.on_events_received(&events); self.transform.transform_all(events, &mut outputs_buf); @@ -903,7 +951,16 @@ impl Runner { let mut in_flight = FuturesOrdered::new(); let mut shutting_down = false; - self.timer.start_wait(); + if self + .timer_tx + .send(UtilizationTimerMessage::StartWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); + } loop { tokio::select! { biased; @@ -964,10 +1021,16 @@ fn build_task_transform( typetag: &str, key: &ComponentKey, outputs: &[TransformOutput], + utilization_emitter: &mut UtilizationEmitter, ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); - let input_rx = crate::utilization::wrap(input_rx.into_stream()); + utilization_emitter.add_component(key.clone(), gauge!("utilization")); + let input_rx = wrap( + utilization_emitter.get_sender(), + key.clone(), + input_rx.into_stream(), + ); let events_received = register!(EventsReceived); let filtered = input_rx diff --git a/src/topology/running.rs b/src/topology/running.rs index 796661b7b4e0c..b23bf1d7803d4 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -7,11 +7,10 @@ use std::{ }; use super::{ - builder, - builder::TopologyPieces, + builder::{self, TopologyPieces}, fanout::{ControlChannel, ControlMessage}, handle_errors, retain, take_healthchecks, - task::TaskOutput, + task::{Task, TaskOutput}, BuiltBuffer, TaskHandle, }; use crate::{ @@ -28,9 +27,9 @@ use tokio::{ time::{interval, sleep_until, Duration, Instant}, }; use tracing::Instrument; -use vector_lib::buffers::topology::channel::BufferSender; use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx}; use vector_lib::trigger::DisabledTrigger; +use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal}; pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver; @@ -49,6 +48,7 @@ pub struct RunningTopology { watch: (WatchTx, WatchRx), pub(crate) running: Arc, graceful_shutdown_duration: Option, + utilization_task: Option, } impl RunningTopology { @@ -67,6 +67,7 @@ impl RunningTopology { running: Arc::new(AtomicBool::new(true)), graceful_shutdown_duration: config.graceful_shutdown_duration, config, + utilization_task: None, } } @@ -1042,6 +1043,7 @@ impl RunningTopology { return None; } + let mut utilization_emitter = pieces.utilization_emitter.take().unwrap(); let mut running_topology = Self::new(config, abort_tx); if !running_topology @@ -1053,6 +1055,17 @@ impl RunningTopology { running_topology.connect_diff(&diff, &mut pieces).await; running_topology.spawn_diff(&diff, pieces); + running_topology.utilization_task = + // TODO: how to name this custom task? + Some(tokio::spawn(Task::new("".into(), "", async move { + utilization_emitter + .run_utilization(ShutdownSignal::noop()) + .await; + // TODO: new task output type for this? Or handle this task in a completely + // different way + Ok(TaskOutput::Healthcheck) + }))); + Some((running_topology, abort_rx)) } } diff --git a/src/utilization.rs b/src/utilization.rs index 2d19cf9c2d150..97df33cd9987d 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -1,21 +1,24 @@ use std::{ + collections::HashMap, pin::Pin, task::{ready, Context, Poll}, time::{Duration, Instant}, }; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use futures::{Stream, StreamExt}; -use metrics::gauge; +use metrics::Gauge; use pin_project::pin_project; use tokio::time::interval; use tokio_stream::wrappers::IntervalStream; +use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal}; use crate::stats; #[pin_project] pub(crate) struct Utilization { - timer: Timer, - intervals: IntervalStream, + timer_tx: UnboundedSender, + component_key: ComponentKey, inner: S, } @@ -42,50 +45,41 @@ where // ready, with the side-effect of reporting every so often about how // long the wait gap is. // - // To achieve this we poll the `intervals` stream and if a new interval - // is ready we hit `Timer::report` and loop back around again to poll - // for a new `Event`. Calls to `Timer::start_wait` will only have an - // effect if `stop_wait` has been called, so the structure of this loop - // avoids double-measures. + // This will just measure the time, while UtilizationEmitter collects + // all the timers and emits utilization value periodically let this = self.project(); - loop { - this.timer.start_wait(); - match this.intervals.poll_next_unpin(cx) { - Poll::Ready(_) => { - this.timer.report(); - continue; - } - Poll::Pending => { - let result = ready!(this.inner.poll_next_unpin(cx)); - this.timer.stop_wait(); - return Poll::Ready(result); - } - } + if this + .timer_tx + .send(UtilizationTimerMessage::StartWait( + this.component_key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?this.component_key, "Couldn't send utilization start wait message from wrapped stream."); } + let result = ready!(this.inner.poll_next_unpin(cx)); + if this + .timer_tx + .send(UtilizationTimerMessage::StopWait( + this.component_key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?this.component_key, "Couldn't send utilization stop wait message from wrapped stream."); + } + Poll::Ready(result) } } -/// Wrap a stream to emit stats about utilization. This is designed for use with -/// the input channels of transform and sinks components, and measures the -/// amount of time that the stream is waiting for input from upstream. We make -/// the simplifying assumption that this wait time is when the component is idle -/// and the rest of the time it is doing useful work. This is more true for -/// sinks than transforms, which can be blocked by downstream components, but -/// with knowledge of the config the data is still useful. -pub(crate) fn wrap(inner: S) -> Utilization { - Utilization { - timer: Timer::new(), - intervals: IntervalStream::new(interval(Duration::from_secs(5))), - inner, - } -} - -pub(super) struct Timer { +pub(crate) struct Timer { overall_start: Instant, span_start: Instant, waiting: bool, total_wait: Duration, ewma: stats::Ewma, + gauge: Gauge, } /// A simple, specialized timer for tracking spans of waiting vs not-waiting @@ -97,32 +91,33 @@ pub(super) struct Timer { /// to be of uniform length and used to aggregate span data into time-weighted /// averages. impl Timer { - pub(crate) fn new() -> Self { + pub(crate) fn new(gauge: Gauge) -> Self { Self { overall_start: Instant::now(), span_start: Instant::now(), waiting: false, total_wait: Duration::new(0, 0), ewma: stats::Ewma::new(0.9), + gauge, } } /// Begin a new span representing time spent waiting - pub(crate) fn start_wait(&mut self) { + pub(crate) fn start_wait(&mut self, at: Instant) { if !self.waiting { - self.end_span(); + self.end_span(at); self.waiting = true; } } /// Complete the current waiting span and begin a non-waiting span - pub(crate) fn stop_wait(&mut self) -> Instant { + pub(crate) fn stop_wait(&mut self, at: Instant) -> Instant { if self.waiting { - let now = self.end_span(); + let now = self.end_span(at); self.waiting = false; now } else { - Instant::now() + at } } @@ -133,7 +128,7 @@ impl Timer { // End the current span so it can be accounted for, but do not change // whether or not we're in the waiting state. This way the next span // inherits the correct status. - let now = self.end_span(); + let now = self.end_span(Instant::now()); let total_duration = now.duration_since(self.overall_start); let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64(); @@ -142,18 +137,99 @@ impl Timer { self.ewma.update(utilization); let avg = self.ewma.average().unwrap_or(f64::NAN); debug!(utilization = %avg); - gauge!("utilization").set(avg); + self.gauge.set(avg); // Reset overall statistics for the next reporting period. self.overall_start = self.span_start; self.total_wait = Duration::new(0, 0); } - fn end_span(&mut self) -> Instant { + fn end_span(&mut self, at: Instant) -> Instant { if self.waiting { - self.total_wait += self.span_start.elapsed(); + self.total_wait += at - self.span_start; } - self.span_start = Instant::now(); + self.span_start = at; self.span_start } } + +#[derive(Debug)] +pub(crate) enum UtilizationTimerMessage { + StartWait(ComponentKey, Instant), + StopWait(ComponentKey, Instant), +} + +pub(crate) struct UtilizationEmitter { + timers: HashMap, + timer_rx: UnboundedReceiver, + timer_tx: UnboundedSender, + intervals: IntervalStream, +} + +impl UtilizationEmitter { + pub(crate) fn new() -> Self { + let (timer_tx, timer_rx) = unbounded_channel(); + Self { + timers: HashMap::default(), + intervals: IntervalStream::new(interval(Duration::from_secs(5))), + timer_tx, + timer_rx, + } + } + + pub(crate) fn add_component(&mut self, key: ComponentKey, gauge: Gauge) { + self.timers.insert(key, Timer::new(gauge)); + } + + pub(crate) fn get_sender(&self) -> UnboundedSender { + self.timer_tx.clone() + } + + pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) { + loop { + tokio::select! { + message = self.timer_rx.recv() => { + match message { + Some(UtilizationTimerMessage::StartWait(key, start_time)) => { + self.timers.get_mut(&key).unwrap().start_wait(start_time); + } + Some(UtilizationTimerMessage::StopWait(key, stop_time)) => { + self.timers.get_mut(&key).unwrap().stop_wait(stop_time); + } + None => break, + } + }, + + Some(_) = self.intervals.next() => { + for timer in self.timers.values_mut() { + timer.report(); + } + }, + + _ = &mut shutdown => { + break + } + } + } + } +} + +/// Wrap a stream to emit stats about utilization. This is designed for use with +/// the input channels of transform and sinks components, and measures the +/// amount of time that the stream is waiting for input from upstream. We make +/// the simplifying assumption that this wait time is when the component is idle +/// and the rest of the time it is doing useful work. This is more true for +/// sinks than transforms, which can be blocked by downstream components, but +/// with knowledge of the config the data is still useful. +#[allow(clippy::missing_const_for_fn)] +pub(crate) fn wrap( + timer_tx: UnboundedSender, + component_key: ComponentKey, + inner: S, +) -> Utilization { + Utilization { + timer_tx, + component_key, + inner, + } +} From 59b7d192b0b513d705cf09cc4580886cce829ecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 23 Dec 2024 17:35:01 +0100 Subject: [PATCH 2/6] Add changelog entry --- changelog.d/22070_utilization_metric_periodic_emit.fix.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/22070_utilization_metric_periodic_emit.fix.md diff --git a/changelog.d/22070_utilization_metric_periodic_emit.fix.md b/changelog.d/22070_utilization_metric_periodic_emit.fix.md new file mode 100644 index 0000000000000..2135cad5e0fec --- /dev/null +++ b/changelog.d/22070_utilization_metric_periodic_emit.fix.md @@ -0,0 +1,3 @@ +The `utilization` metric is now properly published periodically, even when no events are flowing through the components. + +authors: esensar From 416e1cfeb92d6db99dc1fb418bff07a6a08615af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 10 Jan 2025 13:49:59 +0100 Subject: [PATCH 3/6] Remove unnecessary clone when building utilization task Co-authored-by: Pavlos Rontidis --- src/topology/builder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index fe9ffb19542d6..2a698e99af1b9 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -510,7 +510,6 @@ impl<'a> Builder<'a> { } async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) { - let utilization_sender = self.utilization_emitter.get_sender(); for (key, sink) in self .config .sinks() @@ -592,7 +591,7 @@ impl<'a> Builder<'a> { self.utilization_emitter .add_component(key.clone(), gauge!("utilization")); - let utilization_sender = utilization_sender.clone(); + let utilization_sender = self.utilization_emitter.get_sender(); let component_key = key.clone(); let sink = async move { debug!("Sink starting."); From bb5d7ca1a19baf2d1beef217e3263841ad3b148b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 10 Jan 2025 13:52:03 +0100 Subject: [PATCH 4/6] Name utilization task `utilization_heartbeat` --- src/topology/running.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index b23bf1d7803d4..21cf716e5ca61 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -1055,16 +1055,18 @@ impl RunningTopology { running_topology.connect_diff(&diff, &mut pieces).await; running_topology.spawn_diff(&diff, pieces); - running_topology.utilization_task = - // TODO: how to name this custom task? - Some(tokio::spawn(Task::new("".into(), "", async move { + running_topology.utilization_task = Some(tokio::spawn(Task::new( + "utilization_heartbeat".into(), + "", + async move { utilization_emitter .run_utilization(ShutdownSignal::noop()) .await; // TODO: new task output type for this? Or handle this task in a completely // different way Ok(TaskOutput::Healthcheck) - }))); + }, + ))); Some((running_topology, abort_rx)) } From d7d8694c0c1ea1c2f7658f3bedb2f346e8095930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 10 Jan 2025 15:23:25 +0100 Subject: [PATCH 5/6] Join `utilization_task` when stopping topology --- src/topology/running.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 21cf716e5ca61..ecba6a97ea73d 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -117,15 +117,21 @@ impl RunningTopology { // pump in self.tasks, and the other for source in self.source_tasks. let mut check_handles = HashMap::>::new(); + let map_closure = |_result| (); + // We need to give some time to the sources to gracefully shutdown, so // we will merge them with other tasks. for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) { - let task = task.map(|_result| ()).shared(); + let task = task.map(map_closure).shared(); wait_handles.push(task.clone()); check_handles.entry(key).or_default().push(task); } + if let Some(utilization_task) = self.utilization_task { + wait_handles.push(utilization_task.map(map_closure).shared()); + } + // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown. let deadline = self .graceful_shutdown_duration From db6f273fbe29c38f6af71f307636970282ad4ad1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 17 Jan 2025 20:39:36 +0100 Subject: [PATCH 6/6] Shutdown utilization task when stopping topology --- src/topology/running.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index ecba6a97ea73d..46bc97e4a9875 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -22,6 +22,7 @@ use crate::{ spawn_named, }; use futures::{future, Future, FutureExt}; +use stream_cancel::Trigger; use tokio::{ sync::{mpsc, watch}, time::{interval, sleep_until, Duration, Instant}, @@ -49,6 +50,7 @@ pub struct RunningTopology { pub(crate) running: Arc, graceful_shutdown_duration: Option, utilization_task: Option, + utilization_task_shutdown_trigger: Option, } impl RunningTopology { @@ -68,6 +70,7 @@ impl RunningTopology { graceful_shutdown_duration: config.graceful_shutdown_duration, config, utilization_task: None, + utilization_task_shutdown_trigger: None, } } @@ -208,6 +211,9 @@ impl RunningTopology { // Now kick off the shutdown process by shutting down the sources. let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline); + if let Some(trigger) = self.utilization_task_shutdown_trigger { + trigger.cancel(); + } futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ()) } @@ -1061,12 +1067,16 @@ impl RunningTopology { running_topology.connect_diff(&diff, &mut pieces).await; running_topology.spawn_diff(&diff, pieces); + let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) = + ShutdownSignal::new_wired(); + running_topology.utilization_task_shutdown_trigger = + Some(utilization_task_shutdown_trigger); running_topology.utilization_task = Some(tokio::spawn(Task::new( "utilization_heartbeat".into(), "", async move { utilization_emitter - .run_utilization(ShutdownSignal::noop()) + .run_utilization(utilization_shutdown_signal) .await; // TODO: new task output type for this? Or handle this task in a completely // different way