From db6f273fbe29c38f6af71f307636970282ad4ad1 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= <dev@ensarsarajcic.com>
Date: Fri, 17 Jan 2025 20:39:36 +0100
Subject: [PATCH] Shutdown utilization task when stopping topology

---
 src/topology/running.rs | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git a/src/topology/running.rs b/src/topology/running.rs
index ecba6a97ea73d..46bc97e4a9875 100644
--- a/src/topology/running.rs
+++ b/src/topology/running.rs
@@ -22,6 +22,7 @@ use crate::{
     spawn_named,
 };
 use futures::{future, Future, FutureExt};
+use stream_cancel::Trigger;
 use tokio::{
     sync::{mpsc, watch},
     time::{interval, sleep_until, Duration, Instant},
@@ -49,6 +50,7 @@ pub struct RunningTopology {
     pub(crate) running: Arc<AtomicBool>,
     graceful_shutdown_duration: Option<Duration>,
     utilization_task: Option<TaskHandle>,
+    utilization_task_shutdown_trigger: Option<Trigger>,
 }
 
 impl RunningTopology {
@@ -68,6 +70,7 @@ impl RunningTopology {
             graceful_shutdown_duration: config.graceful_shutdown_duration,
             config,
             utilization_task: None,
+            utilization_task_shutdown_trigger: None,
         }
     }
 
@@ -208,6 +211,9 @@ impl RunningTopology {
 
         // Now kick off the shutdown process by shutting down the sources.
         let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline);
+        if let Some(trigger) = self.utilization_task_shutdown_trigger {
+            trigger.cancel();
+        }
 
         futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ())
     }
@@ -1061,12 +1067,16 @@ impl RunningTopology {
         running_topology.connect_diff(&diff, &mut pieces).await;
         running_topology.spawn_diff(&diff, pieces);
 
+        let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) =
+            ShutdownSignal::new_wired();
+        running_topology.utilization_task_shutdown_trigger =
+            Some(utilization_task_shutdown_trigger);
         running_topology.utilization_task = Some(tokio::spawn(Task::new(
             "utilization_heartbeat".into(),
             "",
             async move {
                 utilization_emitter
-                    .run_utilization(ShutdownSignal::noop())
+                    .run_utilization(utilization_shutdown_signal)
                     .await;
                 // TODO: new task output type for this? Or handle this task in a completely
                 // different way