From 321846d1e42ef0c227e85dcaae1a4e6bace6ac7d Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 8 Jan 2025 21:29:22 +0100 Subject: [PATCH 1/3] page_service: add slow-future-logging to the pipelined impl to debug #10309 --- pageserver/src/lib.rs | 42 ++++++++++++++++++++++++++++++++++ pageserver/src/page_service.rs | 17 +++++++++++--- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/pageserver/src/lib.rs b/pageserver/src/lib.rs index ff6af3566c82..59396db4ffc0 100644 --- a/pageserver/src/lib.rs +++ b/pageserver/src/lib.rs @@ -375,6 +375,48 @@ async fn timed_after_cancellation( } } +async fn log_if_slow( + name: &str, + warn_after: std::time::Duration, + fut: Fut, +) -> ::Output { + let started = std::time::Instant::now(); + + let mut fut = std::pin::pin!(fut); + + match tokio::time::timeout(warn_after, &mut fut).await { + Ok(ret) => ret, + Err(_) => { + tracing::info!( + what = name, + elapsed_ms = started.elapsed().as_millis(), + "slow future" + ); + + let res = fut.await; + + tracing::info!( + what = name, + elapsed_ms = started.elapsed().as_millis(), + "slow future completed" + ); + + res + } + } +} + +pub(crate) trait LogIfSlowFutureExt: std::future::Future { + async fn log_if_slow(self, name: &'static str, warn_after: std::time::Duration) -> Self::Output + where + Self: Sized, + { + log_if_slow(name, warn_after, self).await + } +} + +impl LogIfSlowFutureExt for Fut where Fut: std::future::Future {} + #[cfg(test)] mod timed_tests { use super::timed; diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index d00ec11a7611..40bab69ff43f 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -1,6 +1,7 @@ //! The Page Service listens for client connections and serves their GetPage@LSN //! requests. +use crate::LogIfSlowFutureExt; use anyhow::{bail, Context}; use async_compression::tokio::write::GzipEncoder; use bytes::Buf; @@ -1360,9 +1361,10 @@ impl PageServerHandler { &ctx, request_span.clone(), ) + .log_if_slow("pagestream_read_message", Duration::from_secs(10)) .await; let Some(read_res) = read_res.transpose() else { - debug!("client-initiated shutdown"); + info!("client-initiated shutdown"); break; }; exit |= read_res.is_err(); @@ -1370,6 +1372,7 @@ impl PageServerHandler { .send(read_res, |batch, res| { Self::pagestream_do_batch(max_batch_size, batch, res) }) + .log_if_slow("batch_tx.send", Duration::from_secs(10)) .await; exit |= could_send.is_err(); } @@ -1386,11 +1389,14 @@ impl PageServerHandler { async move { let _cancel_batcher = cancel_batcher.drop_guard(); loop { - let maybe_batch = batch_rx.recv().await; + let maybe_batch = batch_rx + .recv() + .log_if_slow("batch_rx.recv", Duration::from_secs(10)) + .await; let batch = match maybe_batch { Ok(batch) => batch, Err(spsc_fold::RecvError::SenderGone) => { - debug!("upstream gone"); + info!("upstream gone"); return Ok(()); } }; @@ -1402,8 +1408,13 @@ impl PageServerHandler { }; batch .throttle_and_record_start_processing(&self.cancel) + .log_if_slow( + "throttle_and_record_start_processing", + Duration::from_secs(10), + ) .await?; self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx) + .log_if_slow("pagesteam_handle_batched_message", Duration::from_secs(10)) .await?; } } From 37c2e80a4830dbdce1c2bee3bb68d24b8928daac Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 9 Jan 2025 11:22:09 +0100 Subject: [PATCH 2/3] more slow logging, pagestream_handle_batched_message has shown to be ultra slow in #10309 --- pageserver/src/page_service.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pageserver/src/page_service.rs b/pageserver/src/page_service.rs index 40bab69ff43f..ddbe5301f2fa 100644 --- a/pageserver/src/page_service.rs +++ b/pageserver/src/page_service.rs @@ -932,6 +932,7 @@ impl PageServerHandler { vec![self .handle_get_rel_exists_request(&shard, &req, ctx) .instrument(span.clone()) + .log_if_slow("handle_get_rel_exists_request", Duration::from_secs(10)) .await .map(|msg| (msg, timer))], span, @@ -948,6 +949,7 @@ impl PageServerHandler { vec![self .handle_get_nblocks_request(&shard, &req, ctx) .instrument(span.clone()) + .log_if_slow("handle_get_nblocks_request", Duration::from_secs(10)) .await .map(|msg| (msg, timer))], span, @@ -972,6 +974,10 @@ impl PageServerHandler { ctx, ) .instrument(span.clone()) + .log_if_slow( + "handle_get_page_at_lsn_request_batched", + Duration::from_secs(10), + ) .await; assert_eq!(res.len(), npages); res @@ -990,6 +996,7 @@ impl PageServerHandler { vec![self .handle_db_size_request(&shard, &req, ctx) .instrument(span.clone()) + .log_if_slow("handle_db_size_request", Duration::from_secs(10)) .await .map(|msg| (msg, timer))], span, @@ -1006,6 +1013,7 @@ impl PageServerHandler { vec![self .handle_get_slru_segment_request(&shard, &req, ctx) .instrument(span.clone()) + .log_if_slow("handle_get_slru_segment_request", Duration::from_secs(10)) .await .map(|msg| (msg, timer))], span, @@ -1104,6 +1112,7 @@ impl PageServerHandler { } // and log the info! line inside the request span .instrument(span.clone()) + .log_if_slow("flush_fut", Duration::from_secs(10)) .await?; } Ok(()) From 61ae7e45cbf0694e590da8136a6ea3c6c68fb1a6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Thu, 9 Jan 2025 12:12:07 +0100 Subject: [PATCH 3/3] bump stack size, apparently it's needed even in release builds --- pageserver/src/task_mgr.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pageserver/src/task_mgr.rs b/pageserver/src/task_mgr.rs index 622738022a3a..4ef30d34b980 100644 --- a/pageserver/src/task_mgr.rs +++ b/pageserver/src/task_mgr.rs @@ -149,7 +149,7 @@ impl FromStr for TokioRuntimeMode { static TOKIO_THREAD_STACK_SIZE: Lazy = Lazy::new(|| { env::var("NEON_PAGESERVER_TOKIO_THREAD_STACK_SIZE") // the default 2MiB are insufficent, especially in debug mode - .unwrap_or_else(|| NonZeroUsize::new(4 * 1024 * 1024).unwrap()) + .unwrap_or_else(|| NonZeroUsize::new(8 * 1024 * 1024).unwrap()) }); static ONE_RUNTIME: Lazy> = Lazy::new(|| {