Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

page_service: add slow-future-logging to the pipelined impl to debug #10309 #10315

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions pageserver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,48 @@ async fn timed_after_cancellation<Fut: std::future::Future>(
}
}

async fn log_if_slow<Fut: std::future::Future>(
name: &str,
warn_after: std::time::Duration,
fut: Fut,
) -> <Fut as std::future::Future>::Output {
let started = std::time::Instant::now();

let mut fut = std::pin::pin!(fut);

match tokio::time::timeout(warn_after, &mut fut).await {
Ok(ret) => ret,
Err(_) => {
tracing::info!(
what = name,
elapsed_ms = started.elapsed().as_millis(),
"slow future"
);

let res = fut.await;

tracing::info!(
what = name,
elapsed_ms = started.elapsed().as_millis(),
"slow future completed"
);

res
}
}
}

pub(crate) trait LogIfSlowFutureExt: std::future::Future {
async fn log_if_slow(self, name: &'static str, warn_after: std::time::Duration) -> Self::Output
where
Self: Sized,
{
log_if_slow(name, warn_after, self).await
}
}

impl<Fut> LogIfSlowFutureExt for Fut where Fut: std::future::Future {}

#[cfg(test)]
mod timed_tests {
use super::timed;
Expand Down
26 changes: 23 additions & 3 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! The Page Service listens for client connections and serves their GetPage@LSN
//! requests.

use crate::LogIfSlowFutureExt;
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
Expand Down Expand Up @@ -931,6 +932,7 @@ impl PageServerHandler {
vec![self
.handle_get_rel_exists_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_get_rel_exists_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
Expand All @@ -947,6 +949,7 @@ impl PageServerHandler {
vec![self
.handle_get_nblocks_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_get_nblocks_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
Expand All @@ -971,6 +974,10 @@ impl PageServerHandler {
ctx,
)
.instrument(span.clone())
.log_if_slow(
"handle_get_page_at_lsn_request_batched",
Duration::from_secs(10),
)
.await;
assert_eq!(res.len(), npages);
res
Expand All @@ -989,6 +996,7 @@ impl PageServerHandler {
vec![self
.handle_db_size_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_db_size_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
Expand All @@ -1005,6 +1013,7 @@ impl PageServerHandler {
vec![self
.handle_get_slru_segment_request(&shard, &req, ctx)
.instrument(span.clone())
.log_if_slow("handle_get_slru_segment_request", Duration::from_secs(10))
.await
.map(|msg| (msg, timer))],
span,
Expand Down Expand Up @@ -1103,6 +1112,7 @@ impl PageServerHandler {
}
// and log the info! line inside the request span
.instrument(span.clone())
.log_if_slow("flush_fut", Duration::from_secs(10))
.await?;
}
Ok(())
Expand Down Expand Up @@ -1360,16 +1370,18 @@ impl PageServerHandler {
&ctx,
request_span.clone(),
)
.log_if_slow("pagestream_read_message", Duration::from_secs(10))
.await;
let Some(read_res) = read_res.transpose() else {
debug!("client-initiated shutdown");
info!("client-initiated shutdown");
break;
};
exit |= read_res.is_err();
let could_send = batch_tx
.send(read_res, |batch, res| {
Self::pagestream_do_batch(max_batch_size, batch, res)
})
.log_if_slow("batch_tx.send", Duration::from_secs(10))
.await;
exit |= could_send.is_err();
}
Expand All @@ -1386,11 +1398,14 @@ impl PageServerHandler {
async move {
let _cancel_batcher = cancel_batcher.drop_guard();
loop {
let maybe_batch = batch_rx.recv().await;
let maybe_batch = batch_rx
.recv()
.log_if_slow("batch_rx.recv", Duration::from_secs(10))
.await;
let batch = match maybe_batch {
Ok(batch) => batch,
Err(spsc_fold::RecvError::SenderGone) => {
debug!("upstream gone");
info!("upstream gone");
return Ok(());
}
};
Expand All @@ -1402,8 +1417,13 @@ impl PageServerHandler {
};
batch
.throttle_and_record_start_processing(&self.cancel)
.log_if_slow(
"throttle_and_record_start_processing",
Duration::from_secs(10),
)
.await?;
self.pagesteam_handle_batched_message(pgb_writer, batch, &cancel, &ctx)
.log_if_slow("pagesteam_handle_batched_message", Duration::from_secs(10))
.await?;
}
}
Expand Down
2 changes: 1 addition & 1 deletion pageserver/src/task_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl FromStr for TokioRuntimeMode {
static TOKIO_THREAD_STACK_SIZE: Lazy<NonZeroUsize> = Lazy::new(|| {
env::var("NEON_PAGESERVER_TOKIO_THREAD_STACK_SIZE")
// the default 2MiB are insufficent, especially in debug mode
.unwrap_or_else(|| NonZeroUsize::new(4 * 1024 * 1024).unwrap())
.unwrap_or_else(|| NonZeroUsize::new(8 * 1024 * 1024).unwrap())
});

static ONE_RUNTIME: Lazy<Option<tokio::runtime::Runtime>> = Lazy::new(|| {
Expand Down
Loading