Skip to content

Commit

Permalink
Fine grained cache metrics (#2890)
Browse files Browse the repository at this point in the history
# Description
The metrics added in #2880
shows for a fact that the `RequestSharing` cache grows over time.
However, all these allocations are currently counted with a single
metric.

# Changes
Instead of tracking all cached items with a single gauge we now track
per `RequestSharing` instance which maybe points us to a very bad
offender giving us an idea what to look out for.
  • Loading branch information
MartinquaXD authored Aug 14, 2024
1 parent a7ba48e commit 7502bc0
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions crates/shared/src/request_sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use {
FutureExt,
},
prometheus::{
core::{AtomicU64, GenericGauge},
core::{AtomicU64, GenericGaugeVec},
IntCounterVec,
},
std::{
Expand Down Expand Up @@ -47,26 +47,27 @@ where
{
pub fn labelled(request_label: String) -> Self {
let cache: Cache<Request, Fut> = Default::default();
Self::spawn_gc(cache.clone());
Self::spawn_gc(cache.clone(), request_label.clone());
Self {
in_flight: cache,
request_label,
}
}

fn collect_garbage(cache: &Cache<Request, Fut>) {
fn collect_garbage(cache: &Cache<Request, Fut>, label: &str) {
let mut cache = cache.lock().unwrap();
let len_before = cache.len() as u64;
cache.retain(|_request, weak| weak.upgrade().is_some());
Metrics::get()
.request_sharing_cached_items
.with_label_values(&[label])
.sub(len_before - cache.len() as u64);
}

fn spawn_gc(cache: Cache<Request, Fut>) {
fn spawn_gc(cache: Cache<Request, Fut>, label: String) {
tokio::task::spawn(async move {
loop {
Self::collect_garbage(&cache);
Self::collect_garbage(&cache, &label);
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
Expand All @@ -78,6 +79,7 @@ impl<A, B: Future> Drop for RequestSharing<A, B> {
let cache = self.in_flight.lock().unwrap();
Metrics::get()
.request_sharing_cached_items
.with_label_values(&[&self.request_label])
.sub(cache.len() as u64);
}
}
Expand Down Expand Up @@ -125,7 +127,10 @@ where
// unwrap because downgrade only returns None if the Shared has already
// completed which cannot be the case because we haven't polled it yet.
in_flight.insert(request, shared.downgrade().unwrap());
Metrics::get().request_sharing_cached_items.inc();
Metrics::get()
.request_sharing_cached_items
.with_label_values(&[&self.request_label])
.inc();
shared
}
}
Expand All @@ -137,7 +142,8 @@ struct Metrics {
request_sharing_access: IntCounterVec,

/// Number of all currently cached requests
request_sharing_cached_items: GenericGauge<AtomicU64>,
#[metric(labels("request_label"))]
request_sharing_cached_items: GenericGaugeVec<AtomicU64>,
}

impl Metrics {
Expand All @@ -155,9 +161,10 @@ mod tests {
// Manually create [`RequestSharing`] so we can have fine grained control
// over the garbage collection.
let cache: Cache<u64, BoxFuture<u64>> = Default::default();
let label = "test".to_string();
let sharing = RequestSharing {
in_flight: cache,
request_label: Default::default(),
request_label: label.clone(),
};

let shared0 = sharing.shared_or_else(0, |_| futures::future::ready(0).boxed());
Expand All @@ -174,14 +181,14 @@ mod tests {
assert_eq!(shared1.weak_count().unwrap(), 1);

// GC does not delete any keys because some tasks still use the future
RequestSharing::collect_garbage(&sharing.in_flight);
RequestSharing::collect_garbage(&sharing.in_flight, &label);
assert_eq!(sharing.in_flight.lock().unwrap().len(), 1);
assert!(sharing.in_flight.lock().unwrap().get(&0).is_some());

// complete second shared
assert_eq!(shared1.now_or_never().unwrap(), 0);

RequestSharing::collect_garbage(&sharing.in_flight);
RequestSharing::collect_garbage(&sharing.in_flight, &label);

// GC deleted all now unused futures
assert!(sharing.in_flight.lock().unwrap().is_empty());
Expand Down

0 comments on commit 7502bc0

Please sign in to comment.