From 9ae980bf4f320c20dc48adce90e92e74fd2ea45c Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 11 Dec 2024 14:37:08 +0100 Subject: [PATCH] page_service: don't count time spent in Batcher towards smgr latency metrics (#10075) ## Problem With pipelining enabled, the time a request spends in the batcher stage counts towards the smgr op latency. If pipelining is disabled, that time is not accounted for. In practice, this results in a jump in smgr getpage latencies in various dashboards and degrades the internal SLO. ## Solution In a similar vein to #10042 and with a similar rationale, this PR stops counting the time spent in batcher stage towards smgr op latency. The smgr op latency metric is reduced to the actual execution time. Time spent in batcher stage is tracked in a separate histogram. I expect to remove that histogram after batching rollout is complete, but it will be helpful in the meantime to reason about the rollout. --- pageserver/src/metrics.rs | 170 +++++++++++++++++++++--------- pageserver/src/page_service.rs | 13 ++- pageserver/src/tenant/throttle.rs | 17 +-- test_runner/fixtures/metrics.py | 1 + 4 files changed, 143 insertions(+), 58 deletions(-) diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index 96ee1578563b..b4e20cb8b90e 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -16,7 +16,6 @@ use postgres_backend::{is_expected_io_error, QueryError}; use pq_proto::framed::ConnectionError; use strum::{EnumCount, VariantNames}; use strum_macros::{IntoStaticStr, VariantNames}; -use tracing::warn; use utils::id::TimelineId; /// Prometheus histogram buckets (in seconds) for operations in the critical @@ -1225,32 +1224,58 @@ pub(crate) mod virtual_file_io_engine { pub(crate) struct SmgrOpTimer(Option); pub(crate) struct SmgrOpTimerInner { - global_latency_histo: Histogram, + global_execution_latency_histo: Histogram, + per_timeline_execution_latency_histo: Option, - // Optional because not all op types are tracked per-timeline - per_timeline_latency_histo: Option, + global_batch_wait_time: Histogram, + per_timeline_batch_wait_time: Histogram, global_flush_in_progress_micros: IntCounter, per_timeline_flush_in_progress_micros: IntCounter, - start: Instant, - throttled: Duration, - op: SmgrQueryType, + timings: SmgrOpTimerState, +} + +#[derive(Debug)] +enum SmgrOpTimerState { + Received { + received_at: Instant, + }, + ThrottleDoneExecutionStarting { + received_at: Instant, + throttle_started_at: Instant, + started_execution_at: Instant, + }, } pub(crate) struct SmgrOpFlushInProgress { - base: Instant, + flush_started_at: Instant, global_micros: IntCounter, per_timeline_micros: IntCounter, } impl SmgrOpTimer { - pub(crate) fn deduct_throttle(&mut self, throttle: &Option) { - let Some(throttle) = throttle else { - return; - }; + pub(crate) fn observe_throttle_done_execution_starting(&mut self, throttle: &ThrottleResult) { let inner = self.0.as_mut().expect("other public methods consume self"); - inner.throttled += *throttle; + match (&mut inner.timings, throttle) { + (SmgrOpTimerState::Received { received_at }, throttle) => match throttle { + ThrottleResult::NotThrottled { start } => { + inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting { + received_at: *received_at, + throttle_started_at: *start, + started_execution_at: *start, + }; + } + ThrottleResult::Throttled { start, end } => { + inner.timings = SmgrOpTimerState::ThrottleDoneExecutionStarting { + received_at: *start, + throttle_started_at: *start, + started_execution_at: *end, + }; + } + }, + (x, _) => panic!("called in unexpected state: {x:?}"), + } } pub(crate) fn observe_smgr_op_completion_and_start_flushing(mut self) -> SmgrOpFlushInProgress { @@ -1263,7 +1288,7 @@ impl SmgrOpTimer { .. } = inner; SmgrOpFlushInProgress { - base: flush_start, + flush_started_at: flush_start, global_micros: global_flush_in_progress_micros, per_timeline_micros: per_timeline_flush_in_progress_micros, } @@ -1274,32 +1299,42 @@ impl SmgrOpTimer { let inner = self.0.take()?; let now = Instant::now(); - let elapsed = now - inner.start; - - let elapsed = match elapsed.checked_sub(inner.throttled) { - Some(elapsed) => elapsed, - None => { - use utils::rate_limit::RateLimit; - static LOGGED: Lazy>> = - Lazy::new(|| { - Mutex::new(enum_map::EnumMap::from_array(std::array::from_fn(|_| { - RateLimit::new(Duration::from_secs(10)) - }))) - }); - let mut guard = LOGGED.lock().unwrap(); - let rate_limit = &mut guard[inner.op]; - rate_limit.call(|| { - warn!(op=?inner.op, ?elapsed, ?inner.throttled, "implementation error: time spent throttled exceeds total request wall clock time"); - }); - elapsed // un-throttled time, more info than just saturating to 0 + + let batch; + let execution; + let throttle; + match inner.timings { + SmgrOpTimerState::Received { received_at } => { + batch = (now - received_at).as_secs_f64(); + // TODO: use label for dropped requests. + // This is quite rare in practice, only during tenant/pageservers shutdown. + throttle = Duration::ZERO; + execution = Duration::ZERO.as_secs_f64(); } - }; + SmgrOpTimerState::ThrottleDoneExecutionStarting { + received_at, + throttle_started_at, + started_execution_at, + } => { + batch = (throttle_started_at - received_at).as_secs_f64(); + throttle = started_execution_at - throttle_started_at; + execution = (now - started_execution_at).as_secs_f64(); + } + } + + // update time spent in batching + inner.global_batch_wait_time.observe(batch); + inner.per_timeline_batch_wait_time.observe(batch); - let elapsed = elapsed.as_secs_f64(); + // time spent in throttle metric is updated by throttle impl + let _ = throttle; - inner.global_latency_histo.observe(elapsed); - if let Some(per_timeline_getpage_histo) = &inner.per_timeline_latency_histo { - per_timeline_getpage_histo.observe(elapsed); + // update metrics for execution latency + inner.global_execution_latency_histo.observe(execution); + if let Some(per_timeline_execution_latency_histo) = + &inner.per_timeline_execution_latency_histo + { + per_timeline_execution_latency_histo.observe(execution); } Some((now, inner)) @@ -1325,12 +1360,12 @@ impl SmgrOpFlushInProgress { // Last call is tracked in `now`. let mut observe_guard = scopeguard::guard( || { - let elapsed = now - self.base; + let elapsed = now - self.flush_started_at; self.global_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); self.per_timeline_micros .inc_by(u64::try_from(elapsed.as_micros()).unwrap()); - self.base = now; + self.flush_started_at = now; }, |mut observe| { observe(); @@ -1377,6 +1412,8 @@ pub(crate) struct SmgrQueryTimePerTimeline { per_timeline_batch_size: Histogram, global_flush_in_progress_micros: IntCounter, per_timeline_flush_in_progress_micros: IntCounter, + global_batch_wait_time: Histogram, + per_timeline_batch_wait_time: Histogram, } static SMGR_QUERY_STARTED_GLOBAL: Lazy = Lazy::new(|| { @@ -1399,12 +1436,15 @@ static SMGR_QUERY_STARTED_PER_TENANT_TIMELINE: Lazy = Lazy::new(| .expect("failed to define a metric") }); +// Alias so all histograms recording per-timeline smgr timings use the same buckets. +static SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS: &[f64] = CRITICAL_OP_BUCKETS; + static SMGR_QUERY_TIME_PER_TENANT_TIMELINE: Lazy = Lazy::new(|| { register_histogram_vec!( "pageserver_smgr_query_seconds", - "Time spent on smgr query handling, aggegated by query type and tenant/timeline.", + "Time spent _executing_ smgr query handling, excluding batch and throttle delays.", &["smgr_query_type", "tenant_id", "shard_id", "timeline_id"], - CRITICAL_OP_BUCKETS.into(), + SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS.into(), ) .expect("failed to define a metric") }); @@ -1462,7 +1502,7 @@ static SMGR_QUERY_TIME_GLOBAL_BUCKETS: Lazy> = Lazy::new(|| { static SMGR_QUERY_TIME_GLOBAL: Lazy = Lazy::new(|| { register_histogram_vec!( "pageserver_smgr_query_seconds_global", - "Time spent on smgr query handling, aggregated by query type.", + "Like pageserver_smgr_query_seconds, but aggregated to instance level.", &["smgr_query_type"], SMGR_QUERY_TIME_GLOBAL_BUCKETS.clone(), ) @@ -1559,6 +1599,25 @@ static PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL: Lazy = Lazy .expect("failed to define a metric") }); +static PAGE_SERVICE_SMGR_BATCH_WAIT_TIME: Lazy = Lazy::new(|| { + register_histogram_vec!( + "pageserver_page_service_pagestream_batch_wait_time_seconds", + "Time a request spent waiting in its batch until the batch moved to throttle&execution.", + &["tenant_id", "shard_id", "timeline_id"], + SMGR_QUERY_TIME_PER_TENANT_TIMELINE_BUCKETS.into(), + ) + .expect("failed to define a metric") +}); + +static PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL: Lazy = Lazy::new(|| { + register_histogram!( + "pageserver_page_service_pagestream_batch_wait_time_seconds_global", + "Like pageserver_page_service_pagestream_batch_wait_time_seconds, but aggregated to instance level.", + SMGR_QUERY_TIME_GLOBAL_BUCKETS.to_vec(), + ) + .expect("failed to define a metric") +}); + impl SmgrQueryTimePerTimeline { pub(crate) fn new(tenant_shard_id: &TenantShardId, timeline_id: &TimelineId) -> Self { let tenant_id = tenant_shard_id.tenant_id.to_string(); @@ -1599,6 +1658,11 @@ impl SmgrQueryTimePerTimeline { .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) .unwrap(); + let global_batch_wait_time = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL.clone(); + let per_timeline_batch_wait_time = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME + .get_metric_with_label_values(&[&tenant_id, &shard_slug, &timeline_id]) + .unwrap(); + let global_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL.clone(); let per_timeline_flush_in_progress_micros = PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS @@ -1614,9 +1678,11 @@ impl SmgrQueryTimePerTimeline { per_timeline_batch_size, global_flush_in_progress_micros, per_timeline_flush_in_progress_micros, + global_batch_wait_time, + per_timeline_batch_wait_time, } } - pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, started_at: Instant) -> SmgrOpTimer { + pub(crate) fn start_smgr_op(&self, op: SmgrQueryType, received_at: Instant) -> SmgrOpTimer { self.global_started[op as usize].inc(); let per_timeline_latency_histo = if matches!(op, SmgrQueryType::GetPageAtLsn) { @@ -1627,15 +1693,15 @@ impl SmgrQueryTimePerTimeline { }; SmgrOpTimer(Some(SmgrOpTimerInner { - global_latency_histo: self.global_latency[op as usize].clone(), - per_timeline_latency_histo, - start: started_at, - op, - throttled: Duration::ZERO, + global_execution_latency_histo: self.global_latency[op as usize].clone(), + per_timeline_execution_latency_histo: per_timeline_latency_histo, + timings: SmgrOpTimerState::Received { received_at }, global_flush_in_progress_micros: self.global_flush_in_progress_micros.clone(), per_timeline_flush_in_progress_micros: self .per_timeline_flush_in_progress_micros .clone(), + global_batch_wait_time: self.global_batch_wait_time.clone(), + per_timeline_batch_wait_time: self.per_timeline_batch_wait_time.clone(), })) } @@ -2889,6 +2955,11 @@ impl TimelineMetrics { shard_id, timeline_id, ]); + let _ = PAGE_SERVICE_SMGR_BATCH_WAIT_TIME.remove_label_values(&[ + tenant_id, + shard_id, + timeline_id, + ]); } } @@ -2919,6 +2990,7 @@ use crate::context::{PageContentKind, RequestContext}; use crate::task_mgr::TaskKind; use crate::tenant::mgr::TenantSlot; use crate::tenant::tasks::BackgroundLoopKind; +use crate::tenant::throttle::ThrottleResult; use crate::tenant::Timeline; /// Maintain a per timeline gauge in addition to the global gauge. @@ -3773,6 +3845,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) { &REMOTE_ONDEMAND_DOWNLOADED_BYTES, &CIRCUIT_BREAKERS_BROKEN, &CIRCUIT_BREAKERS_UNBROKEN, + &PAGE_SERVICE_SMGR_FLUSH_INPROGRESS_MICROS_GLOBAL, ] .into_iter() .for_each(|c| { @@ -3820,6 +3893,7 @@ pub fn preinitialize_metrics(conf: &'static PageServerConf) { &WAL_REDO_BYTES_HISTOGRAM, &WAL_REDO_PROCESS_LAUNCH_DURATION_HISTOGRAM, &PAGE_SERVICE_BATCH_SIZE_GLOBAL, + &PAGE_SERVICE_SMGR_BATCH_WAIT_TIME_GLOBAL, ] .into_iter() .for_each(|h| { diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 97d94bbe7f33..d00ec11a7611 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -575,7 +575,10 @@ enum BatchedFeMessage { } impl BatchedFeMessage { - async fn throttle(&mut self, cancel: &CancellationToken) -> Result<(), QueryError> { + async fn throttle_and_record_start_processing( + &mut self, + cancel: &CancellationToken, + ) -> Result<(), QueryError> { let (shard, tokens, timers) = match self { BatchedFeMessage::Exists { shard, timer, .. } | BatchedFeMessage::Nblocks { shard, timer, .. } @@ -603,7 +606,7 @@ impl BatchedFeMessage { } }; for timer in timers { - timer.deduct_throttle(&throttled); + timer.observe_throttle_done_execution_starting(&throttled); } Ok(()) } @@ -1230,7 +1233,7 @@ impl PageServerHandler { } }; - if let Err(cancelled) = msg.throttle(&self.cancel).await { + if let Err(cancelled) = msg.throttle_and_record_start_processing(&self.cancel).await { break cancelled; } @@ -1397,7 +1400,9 @@ impl PageServerHandler { return Err(e); } }; - batch.throttle(&self.cancel).await?; + batch + .throttle_and_record_start_processing(&self.cancel) + .await?; self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) .await?; } diff --git a/pageserver/src/tenant/throttle.rs b/pageserver/src/tenant/throttle.rs index 54c0e59daaf6..8ab6a0e0600d 100644 --- a/pageserver/src/tenant/throttle.rs +++ b/pageserver/src/tenant/throttle.rs @@ -58,6 +58,11 @@ pub struct Stats { pub sum_throttled_usecs: u64, } +pub enum ThrottleResult { + NotThrottled { start: Instant }, + Throttled { start: Instant, end: Instant }, +} + impl Throttle where M: Metric, @@ -122,15 +127,15 @@ where self.inner.load().rate_limiter.steady_rps() } - pub async fn throttle(&self, key_count: usize) -> Option { + pub async fn throttle(&self, key_count: usize) -> ThrottleResult { let inner = self.inner.load_full(); // clones the `Inner` Arc + let start = std::time::Instant::now(); + if !inner.enabled { - return None; + return ThrottleResult::NotThrottled { start }; } - let start = std::time::Instant::now(); - self.metric.accounting_start(); self.count_accounted_start.fetch_add(1, Ordering::Relaxed); let did_throttle = inner.rate_limiter.acquire(key_count).await; @@ -145,9 +150,9 @@ where .fetch_add(wait_time.as_micros() as u64, Ordering::Relaxed); let observation = Observation { wait_time }; self.metric.observe_throttling(&observation); - Some(wait_time) + ThrottleResult::Throttled { start, end: now } } else { - None + ThrottleResult::NotThrottled { start } } } } diff --git a/test_runner/fixtures/metrics.py b/test_runner/fixtures/metrics.py index a591e088eff7..c5295360c339 100644 --- a/test_runner/fixtures/metrics.py +++ b/test_runner/fixtures/metrics.py @@ -178,6 +178,7 @@ def counter(name: str) -> str: counter("pageserver_timeline_wal_records_received"), counter("pageserver_page_service_pagestream_flush_in_progress_micros"), *histogram("pageserver_page_service_batch_size"), + *histogram("pageserver_page_service_pagestream_batch_wait_time_seconds"), *PAGESERVER_PER_TENANT_REMOTE_TIMELINE_CLIENT_METRICS, # "pageserver_directory_entries_count", -- only used if above a certain threshold # "pageserver_broken_tenants_count" -- used only for broken