Skip to content

Commit

Permalink
instr(buffer): busy and idle time (#4272)
Browse files Browse the repository at this point in the history
Monitor busy and idle times in the buffer service.

We might not need both metrics in the future as they complement each
other.
  • Loading branch information
jjbayer authored Nov 20, 2024
1 parent 41bc7c7 commit 6fece8e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 22 deletions.
69 changes: 47 additions & 22 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::services::outcome::TrackOutcome;
use crate::services::processor::ProcessingGroup;
use crate::services::projects::cache::{legacy, ProjectCacheHandle, ProjectChange};
use crate::services::test_store::TestStore;
use crate::statsd::RelayTimers;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::ManagedEnvelope;
use crate::MemoryChecker;
Expand Down Expand Up @@ -61,6 +62,15 @@ pub enum EnvelopeBuffer {
NotReady(ProjectKey, Box<Envelope>),
}

impl EnvelopeBuffer {
fn name(&self) -> &'static str {
match &self {
EnvelopeBuffer::Push(_) => "push",
EnvelopeBuffer::NotReady(..) => "project_not_ready",
}
}
}

impl Interface for EnvelopeBuffer {}

impl FromMessage<Self> for EnvelopeBuffer {
Expand Down Expand Up @@ -418,43 +428,58 @@ impl Service for EnvelopeBufferService {
);

let mut sleep = Duration::MAX;
let start = Instant::now();
tokio::select! {
// NOTE: we do not select a bias here.
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => {
match Self::try_pop(&config, &mut buffer, &services, permit).await {
Ok(new_sleep) => {
sleep = new_sleep;
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "pop");
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "pop", {
match Self::try_pop(&config, &mut buffer, &services, permit).await {
Ok(new_sleep) => {
sleep = new_sleep;
}
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to pop envelope"
);
}
Err(error) => {
relay_log::error!(
error = &error as &dyn std::error::Error,
"failed to pop envelope"
);
}
}
}});
}
change = project_changes.recv() => {
if let Ok(ProjectChange::Ready(project_key)) = change {
buffer.mark_ready(&project_key, true);
}
sleep = Duration::ZERO;
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "project_change");
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "project_change", {
if let Ok(ProjectChange::Ready(project_key)) = change {
buffer.mark_ready(&project_key, true);
}
sleep = Duration::ZERO;
});
}
Some(message) = rx.recv() => {
Self::handle_message(&mut buffer, &services, message).await;
sleep = Duration::ZERO;
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "handle_message");
let message_name = message.name();
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = message_name, {
Self::handle_message(&mut buffer, &services, message).await;
sleep = Duration::ZERO;
});
}
shutdown = shutdown.notified() => {
// In case the shutdown was handled, we break out of the loop signaling that
// there is no need to process anymore envelopes.
if Self::handle_shutdown(&mut buffer, shutdown).await {
break;
}
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "shutdown");
relay_statsd::metric!(timer(RelayTimers::BufferBusy), input = "shutdown", {
// In case the shutdown was handled, we break out of the loop signaling that
// there is no need to process anymore envelopes.
if Self::handle_shutdown(&mut buffer, shutdown).await {
break;
}
});
}
Ok(()) = global_config_rx.changed() => {
relay_statsd::metric!(timer(RelayTimers::BufferIdle) = start.elapsed(), input = "global_config_change");
sleep = Duration::ZERO;

}
else => break,
}
Expand Down Expand Up @@ -694,7 +719,7 @@ mod tests {

addr.send(EnvelopeBuffer::NotReady(project_key, envelope));

tokio::time::sleep(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(project_cache_handle.test_num_fetches(), 2);

tokio::time::sleep(Duration::from_millis(1300)).await;
Expand Down
12 changes: 12 additions & 0 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,16 @@ pub enum RelayTimers {
StoreServiceDuration,
/// Timing in milliseconds for the time it takes for initialize the buffer.
BufferInitialization,
/// Timing in milliseconds for the time the buffer service is waiting for input.
///
/// This metric is tagged with:
/// - `input`: The type of input that broke the idling.
BufferIdle,
/// Timing in milliseconds for the time the buffer service spends handling input.
///
/// This metric is tagged with:
/// - `input`: The type of input that the service is handling.
BufferBusy,
/// Timing in milliseconds for the time it takes for the buffer to spool data to disk.
BufferSpool,
/// Timing in milliseconds for the time it takes for the buffer to unspool data from disk.
Expand Down Expand Up @@ -615,6 +625,8 @@ impl TimerMetric for RelayTimers {
#[cfg(feature = "processing")]
RelayTimers::StoreServiceDuration => "store.message.duration",
RelayTimers::BufferInitialization => "buffer.initialization.duration",
RelayTimers::BufferIdle => "buffer.idle",
RelayTimers::BufferBusy => "buffer.busy",
RelayTimers::BufferSpool => "buffer.spool.duration",
RelayTimers::BufferUnspool => "buffer.unspool.duration",
RelayTimers::BufferPush => "buffer.push.duration",
Expand Down

0 comments on commit 6fece8e

Please sign in to comment.