Skip to content

Commit

Permalink
ref(buffer): Remove spool V1 implementation (#4303)
Browse files Browse the repository at this point in the history
  • Loading branch information
jjbayer authored Nov 29, 2024
1 parent 495df09 commit 7904c99
Show file tree
Hide file tree
Showing 18 changed files with 53 additions and 2,895 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
## 24.11.1

**Breaking Changes**:
- Flatten Linux distribution fields into `os.context`([#4292](https://github.com/getsentry/relay/pull/4292))

- Remove `spool.envelopes.{min_connections,max_connections,unspool_interval,max_memory_size}` config options. ([#4303](https://github.com/getsentry/relay/pull/4303))
- Flatten Linux distribution fields into `os.context`. ([#4292](https://github.com/getsentry/relay/pull/4292))

**Bug Fixes**:

Expand All @@ -13,6 +15,7 @@

**Features**:

- Remove old disk spooling logic, default to new version. ([#4303](https://github.com/getsentry/relay/pull/4303))
- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))
- Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283))
- Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291))
Expand Down
108 changes: 10 additions & 98 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -873,21 +873,6 @@ fn spool_envelopes_max_disk_size() -> ByteSize {
ByteSize::mebibytes(500)
}

/// Default for min connections to keep open in the pool.
fn spool_envelopes_min_connections() -> u32 {
1
}

/// Default for max connections to keep open in the pool.
fn spool_envelopes_max_connections() -> u32 {
1
}

/// Default interval to unspool buffered envelopes, 100ms.
fn spool_envelopes_unspool_interval() -> u64 {
100
}

/// Default number of encoded envelope bytes to cache before writing to disk.
fn spool_envelopes_batch_size_bytes() -> ByteSize {
ByteSize::kibibytes(10)
Expand Down Expand Up @@ -924,43 +909,34 @@ pub struct EnvelopeSpool {
///
/// If set, this will enable the buffering for incoming envelopes.
pub path: Option<PathBuf>,
/// Maximum number of connections, which will be maintained by the pool.
#[serde(default = "spool_envelopes_max_connections")]
max_connections: u32,
/// Minimal number of connections, which will be maintained by the pool.
#[serde(default = "spool_envelopes_min_connections")]
min_connections: u32,
/// The maximum size of the buffer to keep, in bytes.
///
/// If not set the default is 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_disk_size")]
max_disk_size: ByteSize,
pub max_disk_size: ByteSize,
/// The maximum bytes to keep in the memory buffer before spooling envelopes to disk, in bytes.
///
/// This is a hard upper bound and defaults to 524288000 bytes (500MB).
#[serde(default = "spool_envelopes_max_memory_size")]
max_memory_size: ByteSize,
/// The interval in milliseconds to trigger unspool.
#[serde(default = "spool_envelopes_unspool_interval")]
unspool_interval: u64,
pub max_memory_size: ByteSize,
/// Number of encoded envelope bytes that are spooled to disk at once.
///
/// Defaults to 10 KiB.
#[serde(default = "spool_envelopes_batch_size_bytes")]
batch_size_bytes: ByteSize,
pub batch_size_bytes: ByteSize,
/// Maximum time between receiving the envelope and processing it.
///
/// When envelopes spend too much time in the buffer (e.g. because their project cannot be loaded),
/// they are dropped. Defaults to 24h.
#[serde(default = "spool_envelopes_max_envelope_delay_secs")]
max_envelope_delay_secs: u64,
pub max_envelope_delay_secs: u64,
/// The refresh frequency in ms of how frequently disk usage is updated by querying SQLite
/// internal page stats.
#[serde(default = "spool_disk_usage_refresh_frequency_ms")]
disk_usage_refresh_frequency_ms: u64,
pub disk_usage_refresh_frequency_ms: u64,
/// The amount of envelopes that the envelope buffer can push to its output queue.
#[serde(default = "spool_max_backpressure_envelopes")]
max_backpressure_envelopes: usize,
pub max_backpressure_envelopes: usize,
/// The relative memory usage above which the buffer service will stop dequeueing envelopes.
///
/// Only applies when [`Self::path`] is set.
Expand All @@ -971,58 +947,34 @@ pub struct EnvelopeSpool {
///
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
max_backpressure_memory_percent: f32,
pub max_backpressure_memory_percent: f32,
/// Number of partitions of the buffer.
#[serde(default = "spool_envelopes_partitions")]
partitions: NonZeroU8,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
}

/// Version of the envelope buffering mechanism.
#[derive(Debug, Default, Deserialize, Serialize)]
pub enum EnvelopeSpoolVersion {
/// Use the spooler service, which only buffers envelopes for unloaded projects and
/// switches between an in-memory mode and a disk mode on-demand.
///
/// This mode will be removed soon.
#[default]
#[serde(rename = "1")]
V1,
/// Use the envelope buffer, through which all envelopes pass before getting unspooled.
/// Can be either disk based or memory based.
///
/// This mode has not yet been stress-tested, do not use in production environments.
#[serde(rename = "experimental")]
V2,
pub partitions: NonZeroU8,
}

impl Default for EnvelopeSpool {
fn default() -> Self {
Self {
path: None,
max_connections: spool_envelopes_max_connections(),
min_connections: spool_envelopes_min_connections(),
max_disk_size: spool_envelopes_max_disk_size(),
max_memory_size: spool_envelopes_max_memory_size(),
unspool_interval: spool_envelopes_unspool_interval(),
batch_size_bytes: spool_envelopes_batch_size_bytes(),
max_envelope_delay_secs: spool_envelopes_max_envelope_delay_secs(),
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
partitions: spool_envelopes_partitions(),
version: EnvelopeSpoolVersion::default(),
}
}
}

/// Persistent buffering configuration.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct Spool {
/// Configuration for envelope spooling.
#[serde(default)]
envelopes: EnvelopeSpool,
pub envelopes: EnvelopeSpool,
}

/// Controls internal caching behavior.
Expand Down Expand Up @@ -2175,45 +2127,17 @@ impl Config {
Some(path)
}

/// Maximum number of connections to create to buffer file.
pub fn spool_envelopes_max_connections(&self) -> u32 {
self.values.spool.envelopes.max_connections
}

/// Minimum number of connections to create to buffer file.
pub fn spool_envelopes_min_connections(&self) -> u32 {
self.values.spool.envelopes.min_connections
}

/// Unspool interval in milliseconds.
pub fn spool_envelopes_unspool_interval(&self) -> Duration {
Duration::from_millis(self.values.spool.envelopes.unspool_interval)
}

/// The maximum size of the buffer, in bytes.
pub fn spool_envelopes_max_disk_size(&self) -> usize {
self.values.spool.envelopes.max_disk_size.as_bytes()
}

/// The maximum size of the memory buffer, in bytes.
pub fn spool_envelopes_max_memory_size(&self) -> usize {
self.values.spool.envelopes.max_memory_size.as_bytes()
}

/// Number of encoded envelope bytes that need to be accumulated before
/// flushing one batch to disk.
pub fn spool_envelopes_batch_size_bytes(&self) -> usize {
self.values.spool.envelopes.batch_size_bytes.as_bytes()
}

/// Returns `true` if version 2 of the spooling mechanism is used.
pub fn spool_v2(&self) -> bool {
matches!(
&self.values.spool.envelopes.version,
EnvelopeSpoolVersion::V2
)
}

/// Returns the time after which we drop envelopes as a [`Duration`] object.
pub fn spool_envelopes_max_age(&self) -> Duration {
Duration::from_secs(self.values.spool.envelopes.max_envelope_delay_secs)
Expand Down Expand Up @@ -2654,16 +2578,4 @@ cache:
fn test_emit_outcomes_invalid() {
assert!(serde_json::from_str::<EmitOutcomes>("asdf").is_err());
}

#[test]
fn test_spool_defaults_to_v1() {
let config: ConfigValues = serde_json::from_str("{}").unwrap();
assert!(matches!(
config.spool.envelopes.version,
EnvelopeSpoolVersion::V1
));

let config = Config::from_json_value(serde_json::json!({})).unwrap();
assert!(!config.spool_v2());
}
}
32 changes: 11 additions & 21 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::service::ServiceState;
use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::legacy::ValidateEnvelope;
use crate::statsd::{RelayCounters, RelayHistograms};
use crate::utils::{self, ApiErrorResponse, FormDataIter, ManagedEnvelope};

Expand Down Expand Up @@ -294,27 +293,18 @@ fn queue_envelope(
envelope.scope(scoping);

let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope());
match state.envelope_buffer(project_key_pair) {
Some(buffer) => {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
None => {
relay_log::trace!("Sending envelope to project cache for V1 buffer");
state
.legacy_project_cache()
.send(ValidateEnvelope::new(envelope));
}
let buffer = state.envelope_buffer(project_key_pair);
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
// The entire envelope is taken for a split above, and it's empty at this point, we can just
// accept it without additional checks.
Expand Down
1 change: 0 additions & 1 deletion relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ pub use self::envelope::Envelope; // pub for benchmarks
pub use self::services::buffer::{
EnvelopeStack, PolymorphicEnvelopeBuffer, SqliteEnvelopeStack, SqliteEnvelopeStore,
}; // pub for benchmarks
pub use self::services::spooler::spool_utils;
pub use self::utils::{MemoryChecker, MemoryStat}; // pub for benchmarks

#[cfg(test)]
Expand Down
8 changes: 1 addition & 7 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,11 @@ impl ServiceState {
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
project_cache: legacy_project_cache.clone(),
test_store: test_store.clone(),
};

runner.start_with(
legacy::ProjectCacheService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
project_cache_handle.clone(),
project_cache_services,
global_config_rx,
Expand Down Expand Up @@ -363,10 +360,7 @@ impl ServiceState {
}

/// Returns the V2 envelope buffer, if present.
pub fn envelope_buffer(
&self,
project_key_pair: ProjectKeyPair,
) -> Option<&ObservableEnvelopeBuffer> {
pub fn envelope_buffer(&self, project_key_pair: ProjectKeyPair) -> &ObservableEnvelopeBuffer {
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

Expand Down
4 changes: 2 additions & 2 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,8 @@ impl SqliteEnvelopeStore {
.shared_cache(true);

let db = SqlitePoolOptions::new()
.max_connections(config.spool_envelopes_max_connections())
.min_connections(config.spool_envelopes_min_connections())
.max_connections(1)
.min_connections(1)
.connect_with(options)
.await
.map_err(SqliteEnvelopeStoreError::SqlxSetupFailed)?;
Expand Down
Loading

0 comments on commit 7904c99

Please sign in to comment.