Skip to content

Commit

Permalink
perf(buffer): Pack each batch into a single row (#4229)
Browse files Browse the repository at this point in the history
Experiments have shown that write throughput largely depends on the
number of rows, i.e. 1 row with a 10 KiB blob is better than 10 rows of
1 KiB blobs.

At the same time, writing envelopes that are larger than 10 KiB in
batches does not improve write throughput.

To guarantee write speed for envelopes that are much smaller than 10 KiB
(e.g. sessions), pack each batch into a single table row.
  • Loading branch information
jjbayer authored Nov 12, 2024
1 parent 1c1e4de commit ae88c93
Show file tree
Hide file tree
Showing 15 changed files with 322 additions and 250 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions migrations/20241107115400_count_column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE envelopes
ADD COLUMN count INTEGER DEFAULT 1 NOT NULL;
30 changes: 7 additions & 23 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -872,13 +872,8 @@ fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default number of envelope to read from disk at once.
fn spool_envelopes_read_batch_size() -> usize {
200
}

/// Default number of encoded envelope bytes to cache before writing to disk.
fn spool_envelopes_write_batch_bytes() -> ByteSize {
fn spool_envelopes_batch_size_bytes() -> ByteSize {
ByteSize::kibibytes(10)
}

Expand Down Expand Up @@ -927,16 +922,11 @@ pub struct EnvelopeSpool {
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
/// Number of envelopes that are read from disk at once.
///
/// Defaults to 10.
#[serde(default = "spool_envelopes_read_batch_size")]
read_batch_size: usize,
/// Number of encoded envelope bytes that are spooled to disk at once.
///
/// Defaults to 10 KiB.
#[serde(default = "spool_envelopes_write_batch_bytes")]
write_batch_bytes: ByteSize,
#[serde(default = "spool_envelopes_batch_size_bytes")]
batch_size_bytes: ByteSize,
/// Maximum time between receiving the envelope and processing it.
///
/// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
Expand All @@ -956,7 +946,7 @@ pub struct EnvelopeSpool {
/// This value should be lower than [`Health::max_memory_percent`] to prevent flip-flopping.
///
/// Warning: this threshold can cause the buffer service to deadlock when the buffer itself
/// is using too much memory (influenced by [`Self::read_batch_size`] and [`Self::write_batch_bytes`]).
/// is using too much memory (influenced by [`Self::batch_size_bytes`]).
///
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
Expand Down Expand Up @@ -993,8 +983,7 @@ impl Default for EnvelopeSpool {
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(),
read_batch_size: spool_envelopes_read_batch_size(),
write_batch_bytes: spool_envelopes_write_batch_bytes(),
batch_size_bytes: spool_envelopes_batch_size_bytes(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
Expand Down Expand Up @@ -2177,15 +2166,10 @@ impl Config {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of envelopes to read from disk at once.
pub fn spool_envelopes_read_batch_size(&self) -> usize {
self.values.spool.envelopes.read_batch_size
}

/// Number of encoded envelope bytes that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_write_batch_bytes(&self) -> usize {
self.values.spool.envelopes.write_batch_bytes.as_bytes()
pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
self.values.spool.envelopes.batch_size_bytes.as_bytes()
}

/// Returns `true` if version 2 of the spooling mechanism is used.
Expand Down
3 changes: 0 additions & 3 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {

let stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
10,
disk_batch_size,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down Expand Up @@ -137,7 +136,6 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {

let mut stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
10,
disk_batch_size,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down Expand Up @@ -178,7 +176,6 @@ fn benchmark_sqlite_envelope_stack(c: &mut Criterion) {

let stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
10,
disk_batch_size,
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
ProjectKey::parse("e12d836b15bb49d7bbf99e64295d995b").unwrap(),
Expand Down
10 changes: 9 additions & 1 deletion relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -662,6 +662,7 @@ mod tests {
use crate::envelope::{Item, ItemType};
use crate::extractors::RequestMeta;
use crate::services::buffer::common::ProjectKeyPair;
use crate::services::buffer::envelope_store::sqlite::DatabaseEnvelope;
use crate::services::buffer::testutils::utils::mock_envelopes;
use crate::utils::MemoryStat;
use crate::SqliteEnvelopeStore;
Expand Down Expand Up @@ -1020,7 +1021,14 @@ mod tests {
// belong to the same project keys, so they belong to the same envelope stack.
let envelopes = mock_envelopes(10);
assert!(store
.insert_many(envelopes.iter().map(|e| e.as_ref().try_into().unwrap()))
.insert_batch(
envelopes
.into_iter()
.map(|e| DatabaseEnvelope::try_from(e.as_ref()).unwrap())
.collect::<Vec<_>>()
.try_into()
.unwrap()
)
.await
.is_ok());

Expand Down
4 changes: 1 addition & 3 deletions relay-server/src/services/buffer/envelope_stack/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,5 @@ impl EnvelopeStack for MemoryEnvelopeStack {
Ok(self.0.pop())
}

fn flush(self) -> Vec<Box<Envelope>> {
self.0
}
async fn flush(self) {}
}
2 changes: 1 addition & 1 deletion relay-server/src/services/buffer/envelope_stack/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ pub trait EnvelopeStack: Send + std::fmt::Debug {

/// Persists all envelopes in the [`EnvelopeStack`]s to external storage, if possible,
/// and consumes the stack provider.
fn flush(self) -> Vec<Box<Envelope>>;
fn flush(self) -> impl Future<Output = ()>;
}
62 changes: 25 additions & 37 deletions relay-server/src/services/buffer/envelope_stack/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use relay_base_schema::project::ProjectKey;
use crate::envelope::Envelope;
use crate::services::buffer::envelope_stack::EnvelopeStack;
use crate::services::buffer::envelope_store::sqlite::{
DatabaseEnvelope, InsertEnvelopeError, SqliteEnvelopeStore, SqliteEnvelopeStoreError,
DatabaseBatch, DatabaseEnvelope, InsertEnvelopeError, SqliteEnvelopeStore,
SqliteEnvelopeStoreError,
};
use crate::statsd::{RelayCounters, RelayTimers};

Expand All @@ -28,16 +29,13 @@ pub enum SqliteEnvelopeStackError {
pub struct SqliteEnvelopeStack {
/// Shared SQLite database pool which will be used to read and write from disk.
envelope_store: SqliteEnvelopeStore,
/// Maximum number of envelopes to read from disk at once.
read_batch_size: NonZeroUsize,
/// Maximum number of bytes in the in-memory cache before we write to disk.
write_batch_bytes: NonZeroUsize,
batch_size_bytes: NonZeroUsize,
/// The project key of the project to which all the envelopes belong.
own_key: ProjectKey,
/// The project key of the root project of the trace to which all the envelopes belong.
sampling_key: ProjectKey,
/// In-memory stack containing a batch of envelopes that either have not been written to disk yet, or have been read from disk recently.
#[allow(clippy::vec_box)]
batch: Vec<DatabaseEnvelope>,
/// Boolean representing whether calls to `push()` and `peek()` check disk in case not enough
/// elements are available in the `batches_buffer`.
Expand All @@ -48,16 +46,14 @@ impl SqliteEnvelopeStack {
/// Creates a new empty [`SqliteEnvelopeStack`].
pub fn new(
envelope_store: SqliteEnvelopeStore,
read_batch_size: usize,
write_batch_bytes: usize,
batch_size_bytes: usize,
own_key: ProjectKey,
sampling_key: ProjectKey,
check_disk: bool,
) -> Self {
Self {
envelope_store,
read_batch_size: NonZeroUsize::new(read_batch_size).expect("batch size should be > 0"),
write_batch_bytes: NonZeroUsize::new(write_batch_bytes)
batch_size_bytes: NonZeroUsize::new(batch_size_bytes)
.expect("batch bytes should be > 0"),
own_key,
sampling_key,
Expand All @@ -68,7 +64,7 @@ impl SqliteEnvelopeStack {

/// Threshold above which the [`SqliteEnvelopeStack`] will spool data from the `buffer` to disk.
fn above_spool_threshold(&self) -> bool {
self.batch.iter().map(|e| e.len()).sum::<usize>() > self.write_batch_bytes.get()
self.batch.iter().map(|e| e.len()).sum::<usize>() > self.batch_size_bytes.get()
}

/// Spools to disk up to `disk_batch_size` envelopes from the `buffer`.
Expand All @@ -78,6 +74,10 @@ impl SqliteEnvelopeStack {
/// of the method.
async fn spool_to_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
let batch = std::mem::take(&mut self.batch);
let Ok(batch) = DatabaseBatch::try_from(batch) else {
return Ok(());
};

relay_statsd::metric!(counter(RelayCounters::BufferSpooledEnvelopes) += batch.len() as u64);

// When early return here, we are acknowledging that the elements that we popped from
Expand All @@ -86,7 +86,7 @@ impl SqliteEnvelopeStack {
// the buffer we will end up with an infinite cycle.
relay_statsd::metric!(timer(RelayTimers::BufferSpool), {
self.envelope_store
.insert_many(batch)
.insert_batch(batch)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?;
});
Expand All @@ -106,21 +106,18 @@ impl SqliteEnvelopeStack {
/// envelope will not be unspooled and unspooling will continue with the remaining envelopes.
async fn unspool_from_disk(&mut self) -> Result<(), SqliteEnvelopeStackError> {
debug_assert!(self.batch.is_empty());
self.batch = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), {
let batch = relay_statsd::metric!(timer(RelayTimers::BufferUnspool), {
self.envelope_store
.delete_many(
self.own_key,
self.sampling_key,
self.read_batch_size.get() as i64,
)
.delete_batch(self.own_key, self.sampling_key)
.await
.map_err(SqliteEnvelopeStackError::EnvelopeStoreError)?
});

if self.batch.is_empty() {
// In case no envelopes were unspooled, we will mark the disk as empty until another
// round of spooling takes place.
self.check_disk = false;
match batch {
Some(batch) => {
self.batch = batch.into();
}
None => self.check_disk = false,
}

relay_statsd::metric!(
Expand Down Expand Up @@ -183,11 +180,10 @@ impl EnvelopeStack for SqliteEnvelopeStack {
Ok(Some(envelope))
}

fn flush(self) -> Vec<Box<Envelope>> {
self.batch
.into_iter()
.filter_map(|e| e.try_into().ok())
.collect()
async fn flush(mut self) {
if let Err(e) = self.spool_to_disk().await {
relay_log::error!(error = &e as &dyn std::error::Error, "flush error");
}
}
}

Expand Down Expand Up @@ -216,7 +212,6 @@ mod tests {
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
10,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("c25ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
true,
Expand All @@ -239,7 +234,6 @@ mod tests {

let mut stack = SqliteEnvelopeStack::new(
envelope_store,
10,
threshold_size,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
Expand Down Expand Up @@ -280,7 +274,6 @@ mod tests {
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
10,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
Expand All @@ -300,7 +293,6 @@ mod tests {
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
10,
2,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
Expand All @@ -318,7 +310,6 @@ mod tests {
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
10,
9999,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
Expand Down Expand Up @@ -364,7 +355,6 @@ mod tests {
// Create stack with threshold just below the size of first 5 envelopes
let mut stack = SqliteEnvelopeStack::new(
envelope_store,
10,
threshold_size,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
Expand Down Expand Up @@ -432,7 +422,6 @@ mod tests {
let envelope_store = SqliteEnvelopeStore::new(db, Duration::from_millis(100));
let mut stack = SqliteEnvelopeStack::new(
envelope_store.clone(),
10,
10 * COMPRESSED_ENVELOPE_SIZE,
ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(),
ProjectKey::parse("b81ae32be2584e0bbd7a4cbb95971fe1").unwrap(),
Expand All @@ -448,9 +437,8 @@ mod tests {
assert_eq!(stack.batch.len(), 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);

// We drain the stack and make sure nothing was spooled to disk.
let drained_envelopes = stack.flush();
assert_eq!(drained_envelopes.into_iter().collect::<Vec<_>>().len(), 5);
assert_eq!(envelope_store.total_count().await.unwrap(), 0);
// We drain the stack and make sure everything was spooled to disk.
stack.flush().await;
assert_eq!(envelope_store.total_count().await.unwrap(), 5);
}
}
Loading

0 comments on commit ae88c93

Please sign in to comment.