From 95886f87229f7e8d82f9057c243b4d48aeaa9acf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 19 Dec 2024 22:48:50 +0100 Subject: [PATCH 01/15] fix(utilization_metric): run a separate task for utilization to ensure it is regularly published This adds a separate task that runs periodically to emit utilization metrics and collect messages from components that need their utilization metrics calculated. This ensures that utilization metric is published even when no events are running through a component. Fixes: #20216 --- src/topology/builder.rs | 99 ++++++++++++++++++----- src/topology/running.rs | 21 ++++- src/utilization.rs | 172 +++++++++++++++++++++++++++++----------- 3 files changed, 222 insertions(+), 70 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 8602e5cce6c13..fe9ffb19542d6 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -8,6 +8,7 @@ use std::{ use futures::{stream::FuturesOrdered, FutureExt, StreamExt, TryStreamExt}; use futures_util::stream::FuturesUnordered; +use metrics::gauge; use stream_cancel::{StreamExt as StreamCancelExt, Trigger, Tripwire}; use tokio::{ select, @@ -51,7 +52,7 @@ use crate::{ spawn_named, topology::task::TaskError, transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf}, - utilization::wrap, + utilization::{wrap, UtilizationEmitter, UtilizationTimerMessage}, SourceSender, }; @@ -84,6 +85,7 @@ struct Builder<'a> { healthchecks: HashMap, detach_triggers: HashMap, extra_context: ExtraContext, + utilization_emitter: UtilizationEmitter, } impl<'a> Builder<'a> { @@ -105,6 +107,7 @@ impl<'a> Builder<'a> { healthchecks: HashMap::new(), detach_triggers: HashMap::new(), extra_context, + utilization_emitter: UtilizationEmitter::new(), } } @@ -128,6 +131,7 @@ impl<'a> Builder<'a> { healthchecks: self.healthchecks, shutdown_coordinator: self.shutdown_coordinator, detach_triggers: self.detach_triggers, + utilization_emitter: Some(self.utilization_emitter), }) } else { Err(self.errors) @@ -497,7 +501,7 @@ impl<'a> Builder<'a> { let (transform_task, transform_outputs) = { let _span = span.enter(); - build_transform(transform, node, input_rx) + build_transform(transform, node, input_rx, &mut self.utilization_emitter) }; self.outputs.extend(transform_outputs); @@ -506,6 +510,7 @@ impl<'a> Builder<'a> { } async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) { + let utilization_sender = self.utilization_emitter.get_sender(); for (key, sink) in self .config .sinks() @@ -585,6 +590,10 @@ impl<'a> Builder<'a> { let (trigger, tripwire) = Tripwire::new(); + self.utilization_emitter + .add_component(key.clone(), gauge!("utilization")); + let utilization_sender = utilization_sender.clone(); + let component_key = key.clone(); let sink = async move { debug!("Sink starting."); @@ -600,7 +609,7 @@ impl<'a> Builder<'a> { .take() .expect("Task started but input has been taken."); - let mut rx = wrap(rx); + let mut rx = wrap(utilization_sender, component_key.clone(), rx); let events_received = register!(EventsReceived); sink.run( @@ -682,6 +691,7 @@ pub struct TopologyPieces { pub(super) healthchecks: HashMap, pub(crate) shutdown_coordinator: SourceShutdownCoordinator, pub(crate) detach_triggers: HashMap, + pub(crate) utilization_emitter: Option, } impl TopologyPieces { @@ -760,11 +770,14 @@ fn build_transform( transform: Transform, node: TransformNode, input_rx: BufferReceiver, + utilization_emitter: &mut UtilizationEmitter, ) -> (Task, HashMap) { match transform { // TODO: avoid the double boxing for function transforms here - Transform::Function(t) => build_sync_transform(Box::new(t), node, input_rx), - Transform::Synchronous(t) => build_sync_transform(t, node, input_rx), + Transform::Function(t) => { + build_sync_transform(Box::new(t), node, input_rx, utilization_emitter) + } + Transform::Synchronous(t) => build_sync_transform(t, node, input_rx, utilization_emitter), Transform::Task(t) => build_task_transform( t, input_rx, @@ -772,6 +785,7 @@ fn build_transform( node.typetag, &node.key, &node.outputs, + utilization_emitter, ), } } @@ -780,10 +794,19 @@ fn build_sync_transform( t: Box, node: TransformNode, input_rx: BufferReceiver, + utilization_emitter: &mut UtilizationEmitter, ) -> (Task, HashMap) { let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key); - let runner = Runner::new(t, input_rx, node.input_details.data_type(), outputs); + utilization_emitter.add_component(node.key.clone(), gauge!("utilization")); + let runner = Runner::new( + t, + input_rx, + utilization_emitter.get_sender(), + node.key.clone(), + node.input_details.data_type(), + outputs, + ); let transform = if node.enable_concurrency { runner.run_concurrently().boxed() } else { @@ -823,8 +846,8 @@ struct Runner { input_rx: Option>, input_type: DataType, outputs: TransformOutputs, - timer: crate::utilization::Timer, - last_report: Instant, + key: ComponentKey, + timer_tx: UnboundedSender, events_received: Registered, } @@ -832,6 +855,8 @@ impl Runner { fn new( transform: Box, input_rx: BufferReceiver, + timer_tx: UnboundedSender, + key: ComponentKey, input_type: DataType, outputs: TransformOutputs, ) -> Self { @@ -840,17 +865,22 @@ impl Runner { input_rx: Some(input_rx), input_type, outputs, - timer: crate::utilization::Timer::new(), - last_report: Instant::now(), + key, + timer_tx, events_received: register!(EventsReceived), } } fn on_events_received(&mut self, events: &EventArray) { - let stopped = self.timer.stop_wait(); - if stopped.duration_since(self.last_report).as_secs() >= 5 { - self.timer.report(); - self.last_report = stopped; + if self + .timer_tx + .send(UtilizationTimerMessage::StopWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization stop wait message from sync transform."); } self.events_received.emit(CountByteSize( @@ -860,7 +890,16 @@ impl Runner { } async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> { - self.timer.start_wait(); + if self + .timer_tx + .send(UtilizationTimerMessage::StartWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); + } self.outputs.send(outputs_buf).await } @@ -877,7 +916,16 @@ impl Runner { .into_stream() .filter(move |events| ready(filter_events_type(events, self.input_type))); - self.timer.start_wait(); + if self + .timer_tx + .send(UtilizationTimerMessage::StartWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); + } while let Some(events) = input_rx.next().await { self.on_events_received(&events); self.transform.transform_all(events, &mut outputs_buf); @@ -903,7 +951,16 @@ impl Runner { let mut in_flight = FuturesOrdered::new(); let mut shutting_down = false; - self.timer.start_wait(); + if self + .timer_tx + .send(UtilizationTimerMessage::StartWait( + self.key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); + } loop { tokio::select! { biased; @@ -964,10 +1021,16 @@ fn build_task_transform( typetag: &str, key: &ComponentKey, outputs: &[TransformOutput], + utilization_emitter: &mut UtilizationEmitter, ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); - let input_rx = crate::utilization::wrap(input_rx.into_stream()); + utilization_emitter.add_component(key.clone(), gauge!("utilization")); + let input_rx = wrap( + utilization_emitter.get_sender(), + key.clone(), + input_rx.into_stream(), + ); let events_received = register!(EventsReceived); let filtered = input_rx diff --git a/src/topology/running.rs b/src/topology/running.rs index 796661b7b4e0c..b23bf1d7803d4 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -7,11 +7,10 @@ use std::{ }; use super::{ - builder, - builder::TopologyPieces, + builder::{self, TopologyPieces}, fanout::{ControlChannel, ControlMessage}, handle_errors, retain, take_healthchecks, - task::TaskOutput, + task::{Task, TaskOutput}, BuiltBuffer, TaskHandle, }; use crate::{ @@ -28,9 +27,9 @@ use tokio::{ time::{interval, sleep_until, Duration, Instant}, }; use tracing::Instrument; -use vector_lib::buffers::topology::channel::BufferSender; use vector_lib::tap::topology::{TapOutput, TapResource, WatchRx, WatchTx}; use vector_lib::trigger::DisabledTrigger; +use vector_lib::{buffers::topology::channel::BufferSender, shutdown::ShutdownSignal}; pub type ShutdownErrorReceiver = mpsc::UnboundedReceiver; @@ -49,6 +48,7 @@ pub struct RunningTopology { watch: (WatchTx, WatchRx), pub(crate) running: Arc, graceful_shutdown_duration: Option, + utilization_task: Option, } impl RunningTopology { @@ -67,6 +67,7 @@ impl RunningTopology { running: Arc::new(AtomicBool::new(true)), graceful_shutdown_duration: config.graceful_shutdown_duration, config, + utilization_task: None, } } @@ -1042,6 +1043,7 @@ impl RunningTopology { return None; } + let mut utilization_emitter = pieces.utilization_emitter.take().unwrap(); let mut running_topology = Self::new(config, abort_tx); if !running_topology @@ -1053,6 +1055,17 @@ impl RunningTopology { running_topology.connect_diff(&diff, &mut pieces).await; running_topology.spawn_diff(&diff, pieces); + running_topology.utilization_task = + // TODO: how to name this custom task? + Some(tokio::spawn(Task::new("".into(), "", async move { + utilization_emitter + .run_utilization(ShutdownSignal::noop()) + .await; + // TODO: new task output type for this? Or handle this task in a completely + // different way + Ok(TaskOutput::Healthcheck) + }))); + Some((running_topology, abort_rx)) } } diff --git a/src/utilization.rs b/src/utilization.rs index 2d19cf9c2d150..97df33cd9987d 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -1,21 +1,24 @@ use std::{ + collections::HashMap, pin::Pin, task::{ready, Context, Poll}, time::{Duration, Instant}, }; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use futures::{Stream, StreamExt}; -use metrics::gauge; +use metrics::Gauge; use pin_project::pin_project; use tokio::time::interval; use tokio_stream::wrappers::IntervalStream; +use vector_lib::{id::ComponentKey, shutdown::ShutdownSignal}; use crate::stats; #[pin_project] pub(crate) struct Utilization { - timer: Timer, - intervals: IntervalStream, + timer_tx: UnboundedSender, + component_key: ComponentKey, inner: S, } @@ -42,50 +45,41 @@ where // ready, with the side-effect of reporting every so often about how // long the wait gap is. // - // To achieve this we poll the `intervals` stream and if a new interval - // is ready we hit `Timer::report` and loop back around again to poll - // for a new `Event`. Calls to `Timer::start_wait` will only have an - // effect if `stop_wait` has been called, so the structure of this loop - // avoids double-measures. + // This will just measure the time, while UtilizationEmitter collects + // all the timers and emits utilization value periodically let this = self.project(); - loop { - this.timer.start_wait(); - match this.intervals.poll_next_unpin(cx) { - Poll::Ready(_) => { - this.timer.report(); - continue; - } - Poll::Pending => { - let result = ready!(this.inner.poll_next_unpin(cx)); - this.timer.stop_wait(); - return Poll::Ready(result); - } - } + if this + .timer_tx + .send(UtilizationTimerMessage::StartWait( + this.component_key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?this.component_key, "Couldn't send utilization start wait message from wrapped stream."); } + let result = ready!(this.inner.poll_next_unpin(cx)); + if this + .timer_tx + .send(UtilizationTimerMessage::StopWait( + this.component_key.clone(), + Instant::now(), + )) + .is_err() + { + debug!(component_id = ?this.component_key, "Couldn't send utilization stop wait message from wrapped stream."); + } + Poll::Ready(result) } } -/// Wrap a stream to emit stats about utilization. This is designed for use with -/// the input channels of transform and sinks components, and measures the -/// amount of time that the stream is waiting for input from upstream. We make -/// the simplifying assumption that this wait time is when the component is idle -/// and the rest of the time it is doing useful work. This is more true for -/// sinks than transforms, which can be blocked by downstream components, but -/// with knowledge of the config the data is still useful. -pub(crate) fn wrap(inner: S) -> Utilization { - Utilization { - timer: Timer::new(), - intervals: IntervalStream::new(interval(Duration::from_secs(5))), - inner, - } -} - -pub(super) struct Timer { +pub(crate) struct Timer { overall_start: Instant, span_start: Instant, waiting: bool, total_wait: Duration, ewma: stats::Ewma, + gauge: Gauge, } /// A simple, specialized timer for tracking spans of waiting vs not-waiting @@ -97,32 +91,33 @@ pub(super) struct Timer { /// to be of uniform length and used to aggregate span data into time-weighted /// averages. impl Timer { - pub(crate) fn new() -> Self { + pub(crate) fn new(gauge: Gauge) -> Self { Self { overall_start: Instant::now(), span_start: Instant::now(), waiting: false, total_wait: Duration::new(0, 0), ewma: stats::Ewma::new(0.9), + gauge, } } /// Begin a new span representing time spent waiting - pub(crate) fn start_wait(&mut self) { + pub(crate) fn start_wait(&mut self, at: Instant) { if !self.waiting { - self.end_span(); + self.end_span(at); self.waiting = true; } } /// Complete the current waiting span and begin a non-waiting span - pub(crate) fn stop_wait(&mut self) -> Instant { + pub(crate) fn stop_wait(&mut self, at: Instant) -> Instant { if self.waiting { - let now = self.end_span(); + let now = self.end_span(at); self.waiting = false; now } else { - Instant::now() + at } } @@ -133,7 +128,7 @@ impl Timer { // End the current span so it can be accounted for, but do not change // whether or not we're in the waiting state. This way the next span // inherits the correct status. - let now = self.end_span(); + let now = self.end_span(Instant::now()); let total_duration = now.duration_since(self.overall_start); let wait_ratio = self.total_wait.as_secs_f64() / total_duration.as_secs_f64(); @@ -142,18 +137,99 @@ impl Timer { self.ewma.update(utilization); let avg = self.ewma.average().unwrap_or(f64::NAN); debug!(utilization = %avg); - gauge!("utilization").set(avg); + self.gauge.set(avg); // Reset overall statistics for the next reporting period. self.overall_start = self.span_start; self.total_wait = Duration::new(0, 0); } - fn end_span(&mut self) -> Instant { + fn end_span(&mut self, at: Instant) -> Instant { if self.waiting { - self.total_wait += self.span_start.elapsed(); + self.total_wait += at - self.span_start; } - self.span_start = Instant::now(); + self.span_start = at; self.span_start } } + +#[derive(Debug)] +pub(crate) enum UtilizationTimerMessage { + StartWait(ComponentKey, Instant), + StopWait(ComponentKey, Instant), +} + +pub(crate) struct UtilizationEmitter { + timers: HashMap, + timer_rx: UnboundedReceiver, + timer_tx: UnboundedSender, + intervals: IntervalStream, +} + +impl UtilizationEmitter { + pub(crate) fn new() -> Self { + let (timer_tx, timer_rx) = unbounded_channel(); + Self { + timers: HashMap::default(), + intervals: IntervalStream::new(interval(Duration::from_secs(5))), + timer_tx, + timer_rx, + } + } + + pub(crate) fn add_component(&mut self, key: ComponentKey, gauge: Gauge) { + self.timers.insert(key, Timer::new(gauge)); + } + + pub(crate) fn get_sender(&self) -> UnboundedSender { + self.timer_tx.clone() + } + + pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) { + loop { + tokio::select! { + message = self.timer_rx.recv() => { + match message { + Some(UtilizationTimerMessage::StartWait(key, start_time)) => { + self.timers.get_mut(&key).unwrap().start_wait(start_time); + } + Some(UtilizationTimerMessage::StopWait(key, stop_time)) => { + self.timers.get_mut(&key).unwrap().stop_wait(stop_time); + } + None => break, + } + }, + + Some(_) = self.intervals.next() => { + for timer in self.timers.values_mut() { + timer.report(); + } + }, + + _ = &mut shutdown => { + break + } + } + } + } +} + +/// Wrap a stream to emit stats about utilization. This is designed for use with +/// the input channels of transform and sinks components, and measures the +/// amount of time that the stream is waiting for input from upstream. We make +/// the simplifying assumption that this wait time is when the component is idle +/// and the rest of the time it is doing useful work. This is more true for +/// sinks than transforms, which can be blocked by downstream components, but +/// with knowledge of the config the data is still useful. +#[allow(clippy::missing_const_for_fn)] +pub(crate) fn wrap( + timer_tx: UnboundedSender, + component_key: ComponentKey, + inner: S, +) -> Utilization { + Utilization { + timer_tx, + component_key, + inner, + } +} From 59b7d192b0b513d705cf09cc4580886cce829ecb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 23 Dec 2024 17:35:01 +0100 Subject: [PATCH 02/15] Add changelog entry --- changelog.d/22070_utilization_metric_periodic_emit.fix.md | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 changelog.d/22070_utilization_metric_periodic_emit.fix.md diff --git a/changelog.d/22070_utilization_metric_periodic_emit.fix.md b/changelog.d/22070_utilization_metric_periodic_emit.fix.md new file mode 100644 index 0000000000000..2135cad5e0fec --- /dev/null +++ b/changelog.d/22070_utilization_metric_periodic_emit.fix.md @@ -0,0 +1,3 @@ +The `utilization` metric is now properly published periodically, even when no events are flowing through the components. + +authors: esensar From 416e1cfeb92d6db99dc1fb418bff07a6a08615af Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 10 Jan 2025 13:49:59 +0100 Subject: [PATCH 03/15] Remove unnecessary clone when building utilization task Co-authored-by: Pavlos Rontidis --- src/topology/builder.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index fe9ffb19542d6..2a698e99af1b9 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -510,7 +510,6 @@ impl<'a> Builder<'a> { } async fn build_sinks(&mut self, enrichment_tables: &vector_lib::enrichment::TableRegistry) { - let utilization_sender = self.utilization_emitter.get_sender(); for (key, sink) in self .config .sinks() @@ -592,7 +591,7 @@ impl<'a> Builder<'a> { self.utilization_emitter .add_component(key.clone(), gauge!("utilization")); - let utilization_sender = utilization_sender.clone(); + let utilization_sender = self.utilization_emitter.get_sender(); let component_key = key.clone(); let sink = async move { debug!("Sink starting."); From bb5d7ca1a19baf2d1beef217e3263841ad3b148b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 10 Jan 2025 13:52:03 +0100 Subject: [PATCH 04/15] Name utilization task `utilization_heartbeat` --- src/topology/running.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index b23bf1d7803d4..21cf716e5ca61 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -1055,16 +1055,18 @@ impl RunningTopology { running_topology.connect_diff(&diff, &mut pieces).await; running_topology.spawn_diff(&diff, pieces); - running_topology.utilization_task = - // TODO: how to name this custom task? - Some(tokio::spawn(Task::new("".into(), "", async move { + running_topology.utilization_task = Some(tokio::spawn(Task::new( + "utilization_heartbeat".into(), + "", + async move { utilization_emitter .run_utilization(ShutdownSignal::noop()) .await; // TODO: new task output type for this? Or handle this task in a completely // different way Ok(TaskOutput::Healthcheck) - }))); + }, + ))); Some((running_topology, abort_rx)) } From d7d8694c0c1ea1c2f7658f3bedb2f346e8095930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 10 Jan 2025 15:23:25 +0100 Subject: [PATCH 05/15] Join `utilization_task` when stopping topology --- src/topology/running.rs | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 21cf716e5ca61..ecba6a97ea73d 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -117,15 +117,21 @@ impl RunningTopology { // pump in self.tasks, and the other for source in self.source_tasks. let mut check_handles = HashMap::>::new(); + let map_closure = |_result| (); + // We need to give some time to the sources to gracefully shutdown, so // we will merge them with other tasks. for (key, task) in self.tasks.into_iter().chain(self.source_tasks.into_iter()) { - let task = task.map(|_result| ()).shared(); + let task = task.map(map_closure).shared(); wait_handles.push(task.clone()); check_handles.entry(key).or_default().push(task); } + if let Some(utilization_task) = self.utilization_task { + wait_handles.push(utilization_task.map(map_closure).shared()); + } + // If we reach this, we will forcefully shutdown the sources. If None, we will never force shutdown. let deadline = self .graceful_shutdown_duration From db6f273fbe29c38f6af71f307636970282ad4ad1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 17 Jan 2025 20:39:36 +0100 Subject: [PATCH 06/15] Shutdown utilization task when stopping topology --- src/topology/running.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index ecba6a97ea73d..46bc97e4a9875 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -22,6 +22,7 @@ use crate::{ spawn_named, }; use futures::{future, Future, FutureExt}; +use stream_cancel::Trigger; use tokio::{ sync::{mpsc, watch}, time::{interval, sleep_until, Duration, Instant}, @@ -49,6 +50,7 @@ pub struct RunningTopology { pub(crate) running: Arc, graceful_shutdown_duration: Option, utilization_task: Option, + utilization_task_shutdown_trigger: Option, } impl RunningTopology { @@ -68,6 +70,7 @@ impl RunningTopology { graceful_shutdown_duration: config.graceful_shutdown_duration, config, utilization_task: None, + utilization_task_shutdown_trigger: None, } } @@ -208,6 +211,9 @@ impl RunningTopology { // Now kick off the shutdown process by shutting down the sources. let source_shutdown_complete = self.shutdown_coordinator.shutdown_all(deadline); + if let Some(trigger) = self.utilization_task_shutdown_trigger { + trigger.cancel(); + } futures::future::join(source_shutdown_complete, shutdown_complete_future).map(|_| ()) } @@ -1061,12 +1067,16 @@ impl RunningTopology { running_topology.connect_diff(&diff, &mut pieces).await; running_topology.spawn_diff(&diff, pieces); + let (utilization_task_shutdown_trigger, utilization_shutdown_signal, _) = + ShutdownSignal::new_wired(); + running_topology.utilization_task_shutdown_trigger = + Some(utilization_task_shutdown_trigger); running_topology.utilization_task = Some(tokio::spawn(Task::new( "utilization_heartbeat".into(), "", async move { utilization_emitter - .run_utilization(ShutdownSignal::noop()) + .run_utilization(utilization_shutdown_signal) .await; // TODO: new task output type for this? Or handle this task in a completely // different way From e81af457d478f39a8a30365d243ef5afdcc95211 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 27 Jan 2025 14:53:00 +0100 Subject: [PATCH 07/15] Hack: fix utilization never ending, by polling another stream? --- src/utilization.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/utilization.rs b/src/utilization.rs index 97df33cd9987d..0b328351986c3 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -17,6 +17,7 @@ use crate::stats; #[pin_project] pub(crate) struct Utilization { + intervals: IntervalStream, timer_tx: UnboundedSender, component_key: ComponentKey, inner: S, @@ -58,6 +59,7 @@ where { debug!(component_id = ?this.component_key, "Couldn't send utilization start wait message from wrapped stream."); } + let _ = this.intervals.poll_next_unpin(cx); let result = ready!(this.inner.poll_next_unpin(cx)); if this .timer_tx @@ -228,6 +230,7 @@ pub(crate) fn wrap( inner: S, ) -> Utilization { Utilization { + intervals: IntervalStream::new(interval(Duration::from_secs(5))), timer_tx, component_key, inner, From c37f1063d0d6b34ea8ab37e3a5af41d71f864ce8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 3 Mar 2025 10:38:16 +0100 Subject: [PATCH 08/15] Credit Quad9DNS in changelog --- changelog.d/22070_utilization_metric_periodic_emit.fix.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog.d/22070_utilization_metric_periodic_emit.fix.md b/changelog.d/22070_utilization_metric_periodic_emit.fix.md index 2135cad5e0fec..27dd9de778d79 100644 --- a/changelog.d/22070_utilization_metric_periodic_emit.fix.md +++ b/changelog.d/22070_utilization_metric_periodic_emit.fix.md @@ -1,3 +1,3 @@ The `utilization` metric is now properly published periodically, even when no events are flowing through the components. -authors: esensar +authors: esensar Quad9DNS From f4e5e447fb8d3bb7fb15f0f55ac6f3f1b730e9e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 13 Jun 2025 18:36:02 +0200 Subject: [PATCH 09/15] Replace unbounded_channel with channel --- src/utilization.rs | 42 +++++++++++++++++------------------------- 1 file changed, 17 insertions(+), 25 deletions(-) diff --git a/src/utilization.rs b/src/utilization.rs index 0b328351986c3..9b51c8fa0ddef 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -4,7 +4,7 @@ use std::{ task::{ready, Context, Poll}, time::{Duration, Instant}, }; -use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::mpsc::{channel, Receiver, Sender}; use futures::{Stream, StreamExt}; use metrics::Gauge; @@ -18,7 +18,7 @@ use crate::stats; #[pin_project] pub(crate) struct Utilization { intervals: IntervalStream, - timer_tx: UnboundedSender, + timer_tx: Sender, component_key: ComponentKey, inner: S, } @@ -49,27 +49,19 @@ where // This will just measure the time, while UtilizationEmitter collects // all the timers and emits utilization value periodically let this = self.project(); - if this - .timer_tx - .send(UtilizationTimerMessage::StartWait( - this.component_key.clone(), - Instant::now(), - )) - .is_err() - { - debug!(component_id = ?this.component_key, "Couldn't send utilization start wait message from wrapped stream."); + if let Err(err) = this.timer_tx.try_send(UtilizationTimerMessage::StartWait( + this.component_key.clone(), + Instant::now(), + )) { + debug!(component_id = ?this.component_key, error = ?err, "Couldn't send utilization start wait message from wrapped stream."); } let _ = this.intervals.poll_next_unpin(cx); let result = ready!(this.inner.poll_next_unpin(cx)); - if this - .timer_tx - .send(UtilizationTimerMessage::StopWait( - this.component_key.clone(), - Instant::now(), - )) - .is_err() - { - debug!(component_id = ?this.component_key, "Couldn't send utilization stop wait message from wrapped stream."); + if let Err(err) = this.timer_tx.try_send(UtilizationTimerMessage::StopWait( + this.component_key.clone(), + Instant::now(), + )) { + debug!(component_id = ?this.component_key, error = ?err, "Couldn't send utilization stop wait message from wrapped stream."); } Poll::Ready(result) } @@ -163,14 +155,14 @@ pub(crate) enum UtilizationTimerMessage { pub(crate) struct UtilizationEmitter { timers: HashMap, - timer_rx: UnboundedReceiver, - timer_tx: UnboundedSender, + timer_rx: Receiver, + timer_tx: Sender, intervals: IntervalStream, } impl UtilizationEmitter { pub(crate) fn new() -> Self { - let (timer_tx, timer_rx) = unbounded_channel(); + let (timer_tx, timer_rx) = channel(1024); Self { timers: HashMap::default(), intervals: IntervalStream::new(interval(Duration::from_secs(5))), @@ -183,7 +175,7 @@ impl UtilizationEmitter { self.timers.insert(key, Timer::new(gauge)); } - pub(crate) fn get_sender(&self) -> UnboundedSender { + pub(crate) fn get_sender(&self) -> Sender { self.timer_tx.clone() } @@ -225,7 +217,7 @@ impl UtilizationEmitter { /// with knowledge of the config the data is still useful. #[allow(clippy::missing_const_for_fn)] pub(crate) fn wrap( - timer_tx: UnboundedSender, + timer_tx: Sender, component_key: ComponentKey, inner: S, ) -> Utilization { From accdc6e4a5dd06bf60d0d3de46da6ab73e0eac34 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 13 Jun 2025 18:56:19 +0200 Subject: [PATCH 10/15] Replace unwrap with expect in utilization --- src/utilization.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/utilization.rs b/src/utilization.rs index 9b51c8fa0ddef..b3580506e350c 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -185,10 +185,10 @@ impl UtilizationEmitter { message = self.timer_rx.recv() => { match message { Some(UtilizationTimerMessage::StartWait(key, start_time)) => { - self.timers.get_mut(&key).unwrap().start_wait(start_time); + self.timers.get_mut(&key).expect("Utilization timer missing for component").start_wait(start_time); } Some(UtilizationTimerMessage::StopWait(key, stop_time)) => { - self.timers.get_mut(&key).unwrap().stop_wait(stop_time); + self.timers.get_mut(&key).expect("Utilization timer missing for component").stop_wait(stop_time); } None => break, } From 762fbdde41cc2e757141d5c68beee034ff5fc412 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Sat, 14 Jun 2025 12:15:54 +0200 Subject: [PATCH 11/15] Increase timer channel buffer size --- src/utilization.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utilization.rs b/src/utilization.rs index b3580506e350c..e889c1be3b934 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -162,7 +162,7 @@ pub(crate) struct UtilizationEmitter { impl UtilizationEmitter { pub(crate) fn new() -> Self { - let (timer_tx, timer_rx) = channel(1024); + let (timer_tx, timer_rx) = channel(4096); Self { timers: HashMap::default(), intervals: IntervalStream::new(interval(Duration::from_secs(5))), From a6bc1b5819aa9480799aef1ec72d2001748d45b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 19 Jun 2025 17:20:56 +0200 Subject: [PATCH 12/15] Wrap utilization timer logic in a separate sender --- src/topology/builder.rs | 76 +++++++---------------------------------- src/utilization.rs | 64 ++++++++++++++++++++++------------ 2 files changed, 56 insertions(+), 84 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index b7a0ae412fada..510071bb195b1 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -52,7 +52,7 @@ use crate::{ spawn_named, topology::task::TaskError, transforms::{SyncTransform, TaskTransform, Transform, TransformOutputs, TransformOutputsBuf}, - utilization::{wrap, UtilizationEmitter, UtilizationTimerMessage}, + utilization::{wrap, UtilizationComponentSender, UtilizationEmitter}, SourceSender, }; @@ -619,9 +619,9 @@ impl<'a> Builder<'a> { let (trigger, tripwire) = Tripwire::new(); - self.utilization_emitter + let utilization_sender = self + .utilization_emitter .add_component(key.clone(), gauge!("utilization")); - let utilization_sender = self.utilization_emitter.get_sender(); let component_key = key.clone(); let sink = async move { debug!("Sink starting."); @@ -826,15 +826,8 @@ fn build_sync_transform( ) -> (Task, HashMap) { let (outputs, controls) = TransformOutputs::new(node.outputs, &node.key); - utilization_emitter.add_component(node.key.clone(), gauge!("utilization")); - let runner = Runner::new( - t, - input_rx, - utilization_emitter.get_sender(), - node.key.clone(), - node.input_details.data_type(), - outputs, - ); + let sender = utilization_emitter.add_component(node.key.clone(), gauge!("utilization")); + let runner = Runner::new(t, input_rx, sender, node.input_details.data_type(), outputs); let transform = if node.enable_concurrency { runner.run_concurrently().boxed() } else { @@ -874,8 +867,7 @@ struct Runner { input_rx: Option>, input_type: DataType, outputs: TransformOutputs, - key: ComponentKey, - timer_tx: UnboundedSender, + timer_tx: UtilizationComponentSender, events_received: Registered, } @@ -883,8 +875,7 @@ impl Runner { fn new( transform: Box, input_rx: BufferReceiver, - timer_tx: UnboundedSender, - key: ComponentKey, + timer_tx: UtilizationComponentSender, input_type: DataType, outputs: TransformOutputs, ) -> Self { @@ -893,23 +884,13 @@ impl Runner { input_rx: Some(input_rx), input_type, outputs, - key, timer_tx, events_received: register!(EventsReceived), } } fn on_events_received(&mut self, events: &EventArray) { - if self - .timer_tx - .send(UtilizationTimerMessage::StopWait( - self.key.clone(), - Instant::now(), - )) - .is_err() - { - debug!(component_id = ?self.key, "Couldn't send utilization stop wait message from sync transform."); - } + self.timer_tx.stop(); self.events_received.emit(CountByteSize( events.len(), @@ -918,16 +899,7 @@ impl Runner { } async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> { - if self - .timer_tx - .send(UtilizationTimerMessage::StartWait( - self.key.clone(), - Instant::now(), - )) - .is_err() - { - debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); - } + self.timer_tx.start(); self.outputs.send(outputs_buf).await } @@ -944,16 +916,7 @@ impl Runner { .into_stream() .filter(move |events| ready(filter_events_type(events, self.input_type))); - if self - .timer_tx - .send(UtilizationTimerMessage::StartWait( - self.key.clone(), - Instant::now(), - )) - .is_err() - { - debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); - } + self.timer_tx.start(); while let Some(events) = input_rx.next().await { self.on_events_received(&events); self.transform.transform_all(events, &mut outputs_buf); @@ -979,16 +942,7 @@ impl Runner { let mut in_flight = FuturesOrdered::new(); let mut shutting_down = false; - if self - .timer_tx - .send(UtilizationTimerMessage::StartWait( - self.key.clone(), - Instant::now(), - )) - .is_err() - { - debug!(component_id = ?self.key, "Couldn't send utilization start wait message from sync transform."); - } + self.timer_tx.start(); loop { tokio::select! { biased; @@ -1053,12 +1007,8 @@ fn build_task_transform( ) -> (Task, HashMap) { let (mut fanout, control) = Fanout::new(); - utilization_emitter.add_component(key.clone(), gauge!("utilization")); - let input_rx = wrap( - utilization_emitter.get_sender(), - key.clone(), - input_rx.into_stream(), - ); + let sender = utilization_emitter.add_component(key.clone(), gauge!("utilization")); + let input_rx = wrap(sender, key.clone(), input_rx.into_stream()); let events_received = register!(EventsReceived); let filtered = input_rx diff --git a/src/utilization.rs b/src/utilization.rs index e889c1be3b934..91c665bd454b9 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -18,7 +18,7 @@ use crate::stats; #[pin_project] pub(crate) struct Utilization { intervals: IntervalStream, - timer_tx: Sender, + timer_tx: UtilizationComponentSender, component_key: ComponentKey, inner: S, } @@ -49,20 +49,10 @@ where // This will just measure the time, while UtilizationEmitter collects // all the timers and emits utilization value periodically let this = self.project(); - if let Err(err) = this.timer_tx.try_send(UtilizationTimerMessage::StartWait( - this.component_key.clone(), - Instant::now(), - )) { - debug!(component_id = ?this.component_key, error = ?err, "Couldn't send utilization start wait message from wrapped stream."); - } + this.timer_tx.start(); let _ = this.intervals.poll_next_unpin(cx); let result = ready!(this.inner.poll_next_unpin(cx)); - if let Err(err) = this.timer_tx.try_send(UtilizationTimerMessage::StopWait( - this.component_key.clone(), - Instant::now(), - )) { - debug!(component_id = ?this.component_key, error = ?err, "Couldn't send utilization stop wait message from wrapped stream."); - } + this.timer_tx.stop(); Poll::Ready(result) } } @@ -148,11 +138,36 @@ impl Timer { } #[derive(Debug)] -pub(crate) enum UtilizationTimerMessage { +enum UtilizationTimerMessage { StartWait(ComponentKey, Instant), StopWait(ComponentKey, Instant), } +pub(crate) struct UtilizationComponentSender { + component_key: ComponentKey, + timer_tx: Sender, +} + +impl UtilizationComponentSender { + pub(crate) fn start(&self) { + if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait( + self.component_key.clone(), + Instant::now(), + )) { + debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization start wait message."); + } + } + + pub(crate) fn stop(&self) { + if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait( + self.component_key.clone(), + Instant::now(), + )) { + debug!(component_id = ?self.component_key, error = ?err, "Couldn't send utilization stop wait message."); + } + } +} + pub(crate) struct UtilizationEmitter { timers: HashMap, timer_rx: Receiver, @@ -171,12 +186,19 @@ impl UtilizationEmitter { } } - pub(crate) fn add_component(&mut self, key: ComponentKey, gauge: Gauge) { - self.timers.insert(key, Timer::new(gauge)); - } - - pub(crate) fn get_sender(&self) -> Sender { - self.timer_tx.clone() + /// Adds a new component to this utilization metric emitter + /// + /// Returns a sender which can be used to send utilization information back to the emitter + pub(crate) fn add_component( + &mut self, + key: ComponentKey, + gauge: Gauge, + ) -> UtilizationComponentSender { + self.timers.insert(key.clone(), Timer::new(gauge)); + UtilizationComponentSender { + timer_tx: self.timer_tx.clone(), + component_key: key, + } } pub(crate) async fn run_utilization(&mut self, mut shutdown: ShutdownSignal) { @@ -217,7 +239,7 @@ impl UtilizationEmitter { /// with knowledge of the config the data is still useful. #[allow(clippy::missing_const_for_fn)] pub(crate) fn wrap( - timer_tx: Sender, + timer_tx: UtilizationComponentSender, component_key: ComponentKey, inner: S, ) -> Utilization { From 4efb580f689baac69dcaa259515728740e9d4e89 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Thu, 19 Jun 2025 17:46:10 +0200 Subject: [PATCH 13/15] Rename start and stop fns --- src/topology/builder.rs | 8 ++++---- src/utilization.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/topology/builder.rs b/src/topology/builder.rs index 510071bb195b1..39ec9453e6b2d 100644 --- a/src/topology/builder.rs +++ b/src/topology/builder.rs @@ -890,7 +890,7 @@ impl Runner { } fn on_events_received(&mut self, events: &EventArray) { - self.timer_tx.stop(); + self.timer_tx.try_send_stop_wait(); self.events_received.emit(CountByteSize( events.len(), @@ -899,7 +899,7 @@ impl Runner { } async fn send_outputs(&mut self, outputs_buf: &mut TransformOutputsBuf) -> crate::Result<()> { - self.timer_tx.start(); + self.timer_tx.try_send_start_wait(); self.outputs.send(outputs_buf).await } @@ -916,7 +916,7 @@ impl Runner { .into_stream() .filter(move |events| ready(filter_events_type(events, self.input_type))); - self.timer_tx.start(); + self.timer_tx.try_send_start_wait(); while let Some(events) = input_rx.next().await { self.on_events_received(&events); self.transform.transform_all(events, &mut outputs_buf); @@ -942,7 +942,7 @@ impl Runner { let mut in_flight = FuturesOrdered::new(); let mut shutting_down = false; - self.timer_tx.start(); + self.timer_tx.try_send_start_wait(); loop { tokio::select! { biased; diff --git a/src/utilization.rs b/src/utilization.rs index 91c665bd454b9..21476ef22617f 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -49,10 +49,10 @@ where // This will just measure the time, while UtilizationEmitter collects // all the timers and emits utilization value periodically let this = self.project(); - this.timer_tx.start(); + this.timer_tx.try_send_start_wait(); let _ = this.intervals.poll_next_unpin(cx); let result = ready!(this.inner.poll_next_unpin(cx)); - this.timer_tx.stop(); + this.timer_tx.try_send_stop_wait(); Poll::Ready(result) } } @@ -149,7 +149,7 @@ pub(crate) struct UtilizationComponentSender { } impl UtilizationComponentSender { - pub(crate) fn start(&self) { + pub(crate) fn try_send_start_wait(&self) { if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StartWait( self.component_key.clone(), Instant::now(), @@ -158,7 +158,7 @@ impl UtilizationComponentSender { } } - pub(crate) fn stop(&self) { + pub(crate) fn try_send_stop_wait(&self) { if let Err(err) = self.timer_tx.try_send(UtilizationTimerMessage::StopWait( self.component_key.clone(), Instant::now(), From e381c1681bf651598262b3cf398ebc9628372ba8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Fri, 20 Jun 2025 18:42:21 +0200 Subject: [PATCH 14/15] Remove unused clippy check --- src/utilization.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/utilization.rs b/src/utilization.rs index 21476ef22617f..cdb2cafcf2222 100644 --- a/src/utilization.rs +++ b/src/utilization.rs @@ -237,7 +237,6 @@ impl UtilizationEmitter { /// and the rest of the time it is doing useful work. This is more true for /// sinks than transforms, which can be blocked by downstream components, but /// with knowledge of the config the data is still useful. -#[allow(clippy::missing_const_for_fn)] pub(crate) fn wrap( timer_tx: UtilizationComponentSender, component_key: ComponentKey, From 58f2ca96535bf6843f0b2794e4d523e31d90c5a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ensar=20Saraj=C4=8Di=C4=87?= Date: Mon, 23 Jun 2025 09:38:02 +0200 Subject: [PATCH 15/15] Replace unwrap with expect in topology start --- src/topology/running.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/topology/running.rs b/src/topology/running.rs index 17aea09bc6506..5c763ea6e7eba 100644 --- a/src/topology/running.rs +++ b/src/topology/running.rs @@ -1233,7 +1233,10 @@ impl RunningTopology { return None; } - let mut utilization_emitter = pieces.utilization_emitter.take().unwrap(); + let mut utilization_emitter = pieces + .utilization_emitter + .take() + .expect("Topology is missing the utilization metric emitter!"); let mut running_topology = Self::new(config, abort_tx); if !running_topology