Skip to content

Commit

Permalink
ref(server): Send metrics from aggregator directly to processor (#4336)
Browse files Browse the repository at this point in the history
Instead of sending metrics from the aggregator to the 'legacy project
cache` to check the project status there and either return the metrics
to the aggregator or send them onwards, we can directly only flush
metrics for projects which are ready and then directly send them to the
processor.

Fixes: #4232
  • Loading branch information
Dav1dde authored Dec 5, 2024
1 parent 49c729f commit 7ed88f2
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 279 deletions.
6 changes: 4 additions & 2 deletions relay-metrics/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use rand::SeedableRng;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use relay_metrics::{
aggregator::{Aggregator, AggregatorConfig},
aggregator::{Aggregator, AggregatorConfig, FlushDecision},
Bucket, BucketValue, DistributionValue, FiniteF64,
};
use std::cell::RefCell;
Expand Down Expand Up @@ -224,7 +224,9 @@ fn bench_insert_and_flush(c: &mut Criterion) {
|mut aggregator| {
// XXX: Ideally we'd want to test the entire try_flush here, but spawning
// a service is too much work here.
black_box(aggregator.pop_flush_buckets(black_box(false)));
black_box(aggregator.pop_flush_buckets(black_box(false), |_| {
FlushDecision::Flush(Vec::new())
}));
},
BatchSize::SmallInput,
)
Expand Down
11 changes: 11 additions & 0 deletions relay-metrics/src/aggregator/buckets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ impl Buckets {
}
self.queue.pop()
}

/// Inserts an entry back into the collection.
///
/// The intended use is to return bucket after [`Self::try_pop`].
///
/// Note: this should only be used to return values back to the queue
/// after removing them, otherwise use [`Self::merge`].
pub fn re_add(&mut self, key: BucketKey, value: QueuedBucket) {
let _old = self.queue.push(key, value);
debug_assert!(_old.is_none());
}
}

impl IntoIterator for Buckets {
Expand Down
157 changes: 108 additions & 49 deletions relay-metrics/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::hash::Hasher;
use std::time::Duration;

use fnv::FnvHasher;
use hashbrown::hash_map::Entry;
use relay_base_schema::project::ProjectKey;
use relay_common::time::UnixTimestamp;
use thiserror::Error;
Expand Down Expand Up @@ -130,13 +131,20 @@ impl Aggregator {
/// Pop and return the partitions with buckets that are eligible for flushing out according to
/// bucket interval.
///
/// If no partitioning is enabled, the function will return a single `None` partition.
/// For each project `flush_decision` is called, which can influence the flush decision
/// for buckets of that project. The decision can also return metadata for the project
/// on which all flushed buckets for the project are merged with the `merge` function.
///
/// `flush_decision` is only called once if the decision is [`FlushDecision::Flush`],
/// but may be called multiple times otherwise. The `flush_decision` should be consistent
/// for each project key passed.
///
/// Note that this function is primarily intended for tests.
pub fn pop_flush_buckets(
/// If no partitioning is enabled, the function will return a single `None` partition.
pub fn pop_flush_buckets<T: Extend<Bucket>>(
&mut self,
force: bool,
) -> HashMap<Option<u64>, HashMap<ProjectKey, Vec<Bucket>>> {
mut flush_decision: impl FnMut(ProjectKey) -> FlushDecision<T>,
) -> HashMap<Option<u64>, HashMap<ProjectKey, T>> {
relay_statsd::metric!(
gauge(MetricGauges::Buckets) = self.bucket_count() as u64,
aggregator = &self.name,
Expand All @@ -155,50 +163,89 @@ impl Aggregator {
let mut partitions = HashMap::new();
let mut stats = HashMap::new();

let mut re_add = Vec::new();

let now = Instant::now();
let ts = UnixTimestamp::from_instant(now.into_std());

let should_pop = |entry: &QueuedBucket| force || entry.elapsed(now);

let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;

let start_bucket_scan = Instant::now();
while let Some((key, queued_bucket)) = self.buckets.try_pop(|_, b| should_pop(b)) {
let partition = self.config.flush_partitions.map(|p| key.partition_key(p));
let partition = partitions.entry(partition).or_insert_with(HashMap::new);

let cost = key.cost() + queued_bucket.value.cost();

let buckets = match partition.entry(key.project_key) {
Entry::Occupied(occupied_entry) => occupied_entry.into_mut(),
Entry::Vacant(vacant_entry) => {
match flush_decision(key.project_key) {
FlushDecision::Flush(v) => vacant_entry.insert(v),
FlushDecision::Delay => {
let mut entry = queued_bucket;

// No point in re-calculating the flush time when `force` is `true`.
// We can still do it on a flush without force.
if !force {
entry.flush_at =
get_flush_time(&self.config, ts, self.reference_time, &key);
// This should not happen, but may happen with weird
// configs or due to bugs.
debug_assert!(!entry.elapsed(now));
}

// Re-use the loop condition for `try_pop`, to make sure we
// will never accidentally create an infinite loop.
match should_pop(&entry) {
// If we would pop the entry again (`force`, a weird flush
// configuration, a bug), re-add it after the loop to
// prevent infinite loops.
true => re_add.push((key, entry)),
// Otherwise it's safe to immediately re-add.
false => self.buckets.re_add(key, entry),
}

continue;
}
FlushDecision::Drop => {
cost_tracker.subtract_cost(key.namespace(), key.project_key, cost);
continue;
}
}
}
};

cost_tracker.subtract_cost(key.namespace(), key.project_key, cost);

let (bucket_count, item_count) = stats
.entry((queued_bucket.value.ty(), key.namespace()))
.or_insert((0usize, 0usize));
*bucket_count += 1;
*item_count += queued_bucket.value.len();

let bucket = Bucket {
timestamp: key.timestamp,
width: bucket_interval,
name: key.metric_name,
value: queued_bucket.value,
tags: key.tags,
metadata: queued_bucket.metadata,
};

buckets.extend(std::iter::once(bucket));
}

for (key, entry) in re_add {
self.buckets.re_add(key, entry);
}

relay_statsd::metric!(
timer(MetricTimers::BucketsScanDuration),
timer(MetricTimers::BucketsScanDuration) = start_bucket_scan.elapsed(),
aggregator = &self.name,
{
let bucket_interval = self.config.bucket_interval;
let cost_tracker = &mut self.cost_tracker;

while let Some((key, entry)) =
self.buckets.try_pop(|_, entry| entry.elapsed(now) || force)
{
cost_tracker.subtract_cost(key.namespace(), key.project_key, key.cost());
cost_tracker.subtract_cost(
key.namespace(),
key.project_key,
entry.value.cost(),
);

let (bucket_count, item_count) = stats
.entry((entry.value.ty(), key.namespace()))
.or_insert((0usize, 0usize));
*bucket_count += 1;
*item_count += entry.value.len();

let partition = self.config.flush_partitions.map(|p| key.partition_key(p));

let bucket = Bucket {
timestamp: key.timestamp,
width: bucket_interval,
name: key.metric_name,
value: entry.value,
tags: key.tags,
metadata: entry.metadata,
};

partitions
.entry(partition)
.or_insert_with(HashMap::new)
.entry(key.project_key)
.or_insert_with(Vec::new)
.push(bucket);
}
}
);

for ((ty, namespace), (bucket_count, item_count)) in stats.into_iter() {
Expand Down Expand Up @@ -350,6 +397,16 @@ impl fmt::Debug for Aggregator {
}
}

/// Decision what to do with a bucket when flushing.
pub enum FlushDecision<T> {
/// Flush the bucket with the provided metadata.
Flush(T),
/// Drop the bucket.
Drop,
/// Delay flushing the bucket into the future, it's not ready.
Delay,
}

/// Validates the metric name and its tags are correct.
///
/// Returns `Err` if the metric should be dropped.
Expand Down Expand Up @@ -755,21 +812,20 @@ mod tests {
assert_eq!(total_cost, current_cost + expected_added_cost);
}

aggregator.pop_flush_buckets(true);
aggregator.pop_flush_buckets(true, |_| FlushDecision::Flush(Vec::new()));
assert_eq!(aggregator.cost_tracker.total_cost(), 0);
}

#[tokio::test]
#[tokio::test(start_paused = true)]
async fn test_aggregator_flush() {
// Make sure that the right cost is added / subtracted
let mut aggregator: Aggregator = Aggregator::new(AggregatorConfig {
bucket_interval: 10,
..test_config()
});
let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();

let project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fed").unwrap();
let now = UnixTimestamp::now();
tokio::time::pause();

for i in 0..3u32 {
for (name, offset) in [("foo", 30), ("bar", 15)] {
Expand All @@ -789,7 +845,10 @@ mod tests {

let mut flush_buckets = || {
let mut result = Vec::new();
for (partition, v) in aggregator.pop_flush_buckets(false) {

let partitions =
aggregator.pop_flush_buckets(false, |_| FlushDecision::Flush(Vec::new()));
for (partition, v) in partitions {
assert!(partition.is_none());
for (pk, buckets) in v {
assert_eq!(pk, project_key);
Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ impl ServiceState {
let aggregator = RouterService::new(
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(legacy_project_cache.clone().recipient()),
Some(processor.clone().recipient()),
project_cache_handle.clone(),
);
let aggregator_handle = aggregator.handle();
let aggregator = runner.start(aggregator);
Expand Down Expand Up @@ -284,7 +285,6 @@ impl ServiceState {
// Keep all the services in one context.
let project_cache_services = legacy::Services {
envelope_buffer: envelope_buffer.clone(),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
Expand Down
Loading

0 comments on commit 7ed88f2

Please sign in to comment.