Skip to content
This repository has been archived by the owner on Feb 8, 2024. It is now read-only.

feat: clean up partition limiter state on schedule #72

Merged
merged 2 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions capture/src/partition_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::sync::Arc;

use governor::{clock, state::keyed::DefaultKeyedStateStore, Quota, RateLimiter};
use metrics::gauge;
use rand::Rng;

// See: https://docs.rs/governor/latest/governor/_guide/index.html#usage-in-multiple-threads
#[derive(Clone)]
Expand Down Expand Up @@ -49,6 +50,23 @@ impl PartitionLimiter {
gauge!("partition_limits_key_count", self.limiter.len() as f64);
}
}

/// Clean up the rate limiter state, once per minute. Ensure we don't use more memory than
/// necessary.
pub async fn clean_state(&self) {
// Give a small amount of randomness to the interval to ensure we don't have all replicas
// locking at the same time. The lock isn't going to be held for long, but this will reduce
// impact regardless.
let interval_secs = rand::thread_rng().gen_range(60..70);

let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(interval_secs));
loop {
interval.tick().await;

self.limiter.retain_recent();
self.limiter.shrink_to_fit();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: because we know that we'll see new distinct_ids soon (as sessions churn from visitor to visitor), I don't think the cost of shrink is worth it now, I'd just run retain every 5 minutes, which should in theory keep the active key count bounded. Alternatively, we could shrink before we retain, to only re-allocate the map if it didn't grow back after the last retain.
But it's safe to merge as is and tune later.

}
}
}

#[cfg(test)]
Expand Down
11 changes: 11 additions & 0 deletions capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ where
partition.report_metrics().await;
});
}

{
// Ensure that the rate limiter state does not grow unbounded

let partition = partition.clone();

tokio::spawn(async move {
partition.clean_state().await;
});
}

let sink = sink::KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");

Expand Down