Skip to content

Commit

Permalink
feat(buffer): Spool compressed envelopes (#4171)
Browse files Browse the repository at this point in the history
Enable compression for spooled envelopes (experimental spooler). As the
results of [this PR](#4162)
showed, avg. compression ratio is 4x on PoPs and 6x on processing
relays. Average compression time is 10 microseconds and 30 microseconds,
respectively.

This PR also changes how disk writes are batched: It counts encoded
bytes instead of number of envelopes to decide whether to write a batch
to disk. For disk _reads_, still count the number of envelopes for now.
Using bytes for read batching would require [window
functions](https://www.sqlite.org/windowfunctions.html) or similar.

---------

Co-authored-by: Riccardo Busetti <[email protected]>
  • Loading branch information
jjbayer and iambriccardo authored Nov 6, 2024
1 parent 277c54d commit f0d01aa
Show file tree
Hide file tree
Showing 8 changed files with 329 additions and 248 deletions.
50 changes: 26 additions & 24 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,14 +872,14 @@ fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default batch size for the stack.
fn spool_envelopes_stack_disk_batch_size() -> usize {
/// Default number of envelope to read from disk at once.
fn spool_envelopes_read_batch_size() -> usize {
200
}

/// Default maximum number of batches for the stack.
fn spool_envelopes_stack_max_batches() -> usize {
2
/// Default number of encoded envelope bytes to cache before writing to disk.
fn spool_envelopes_write_batch_bytes() -> ByteSize {
ByteSize::kibibytes(10)
}

fn spool_envelopes_max_envelope_delay_secs() -> u64 {
Expand Down Expand Up @@ -927,13 +927,16 @@ pub struct EnvelopeSpool {
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
/// Number of elements of the envelope stack that are flushed to disk.
#[serde(default = "spool_envelopes_stack_disk_batch_size")]
disk_batch_size: usize,
/// Number of batches of size [`Self::disk_batch_size`] that need to be accumulated before
/// flushing one batch to disk.
#[serde(default = "spool_envelopes_stack_max_batches")]
max_batches: usize,
/// Number of envelopes that are read from disk at once.
///
/// Defaults to 10.
#[serde(default = "spool_envelopes_read_batch_size")]
read_batch_size: usize,
/// Number of encoded envelope bytes that are spooled to disk at once.
///
/// Defaults to 10 KiB.
#[serde(default = "spool_envelopes_write_batch_bytes")]
write_batch_bytes: ByteSize,
/// Maximum time between receiving the envelope and processing it.
///
/// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
Expand All @@ -953,7 +956,7 @@ pub struct EnvelopeSpool {
/// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
///
/// Warning: this threshold can cause the buffer service to deadlock when the buffer itself
/// is using too much memory (influenced by [`Self::max_batches`] and [`Self::disk_batch_size`]).
/// is using too much memory (influenced by [`Self::read_batch_size`] and [`Self::write_batch_bytes`]).
///
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
Expand Down Expand Up @@ -989,9 +992,9 @@ impl Default for EnvelopeSpool {
min_connections: spool_envelopes_min_connections(),
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(), // 100ms
disk_batch_size: spool_envelopes_stack_disk_batch_size(),
max_batches: spool_envelopes_stack_max_batches(),
unspool_interval: spool_envelopes_unspool_interval(),
read_batch_size: spool_envelopes_read_batch_size(),
write_batch_bytes: spool_envelopes_write_batch_bytes(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
Expand Down Expand Up @@ -2174,22 +2177,21 @@ impl Config {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_disk_batch_size(&self) -> usize {
self.values.spool.envelopes.disk_batch_size
/// Number of envelopes to read from disk at once.
pub fn spool_envelopes_read_batch_size(&self) -> usize {
self.values.spool.envelopes.read_batch_size
}

/// Number of batches of size `stack_disk_batch_size` that need to be accumulated before
/// Number of encoded envelope bytes that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_stack_max_batches(&self) -> usize {
self.values.spool.envelopes.max_batches
pub fn spool_envelopes_write_batch_bytes(&self) -> usize {
self.values.spool.envelopes.write_batch_bytes.as_bytes()
}

/// Returns `true` if version 2 of the spooling mechanism is used.
pub fn spool_v2(&self) -> bool {
matches!(
self.values.spool.envelopes.version,
&self.values.spool.envelopes.version,
EnvelopeSpoolVersion::V2
)
}
Expand Down
6 changes: 3 additions & 3 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {

let stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
10,
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
Expand Down Expand Up @@ -137,8 +137,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {

let mut stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
10,
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
Expand Down Expand Up @@ -178,8 +178,8 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {

let stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
10,
disk_batch_size,
2,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
true,
Expand Down
Loading

0 comments on commit f0d01aa

Please sign in to comment.