Skip to content

Commit

Permalink
initial logical size calculation: always poll to completion (#10471)
Browse files Browse the repository at this point in the history
# Refs

- extracted from #9353

# Problem

Before this PR, when task_mgr shutdown is signalled, e.g. during
pageserver shutdown or Tenant shutdown, initial logical size calculation
stops polling and drops the future that represents the calculation.

This is against the current policy that we poll all futures to
completion.

This became apparent during development of concurrent IO which warns if
we drop a `Timeline::get_vectored` future that still has in-flight IOs.

We may revise the policy in the future, but, right now initial logical
size calculation is the only part of the codebase that doesn't adhere to
the policy, so let's fix it.

## Code Changes

- make sensitive exclusively to `Timeline::cancel`
- This should be sufficient for all cases of shutdowns; the sensitivity
to task_mgr shutdown is unnecessary.
- this broke the various cancel tests in `test_timeline_size.py`, e.g.,
`test_timeline_initial_logical_size_calculation_cancellation`
- the tests would time out because the await point was not sensitive to
cancellation
- to fix this, refactor `pausable_failpoint` so that it accepts a
cancellation token
- side note: we _really_ should write our own failpoint library; maybe
after we get heap-allocated RequestContext, we can plumb failpoints
through there.
  • Loading branch information
problame authored Jan 22, 2025
1 parent b4d87b9 commit b31ce14
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 62 deletions.
54 changes: 39 additions & 15 deletions libs/utils/src/failpoint_support.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,55 @@ use tracing::*;

/// Declare a failpoint that can use to `pause` failpoint action.
/// We don't want to block the executor thread, hence, spawn_blocking + await.
///
/// Optionally pass a cancellation token, and this failpoint will drop out of
/// its pause when the cancellation token fires. This is useful for testing
/// cases where we would like to block something, but test its clean shutdown behavior.
/// The macro evaluates to a Result in that case, where Ok(()) is the case
/// where the failpoint was not paused, and Err() is the case where cancellation
/// token fired while evaluating the failpoint.
///
/// Remember to unpause the failpoint in the test; until that happens, one of the
/// limited number of spawn_blocking thread pool threads is leaked.
#[macro_export]
macro_rules! pausable_failpoint {
($name:literal) => {
($name:literal) => {{
if cfg!(feature = "testing") {
let cancel = ::tokio_util::sync::CancellationToken::new();
let _ = $crate::pausable_failpoint!($name, &cancel);
}
}};
($name:literal, $cancel:expr) => {{
if cfg!(feature = "testing") {
tokio::task::spawn_blocking({
let current = tracing::Span::current();
let failpoint_fut = ::tokio::task::spawn_blocking({
let current = ::tracing::Span::current();
move || {
let _entered = current.entered();
tracing::info!("at failpoint {}", $name);
fail::fail_point!($name);
::tracing::info!("at failpoint {}", $name);
::fail::fail_point!($name);
}
});
let cancel_fut = async move {
$cancel.cancelled().await;
};
::tokio::select! {
res = failpoint_fut => {
res.expect("spawn_blocking");
// continue with execution
Ok(())
},
_ = cancel_fut => {
Err(())
}
})
.await
.expect("spawn_blocking");
}
};
($name:literal, $cond:expr) => {
if cfg!(feature = "testing") {
if $cond {
pausable_failpoint!($name)
}
} else {
Ok(())
}
};
}};
}

pub use pausable_failpoint;

/// use with fail::cfg("$name", "return(2000)")
///
/// The effect is similar to a "sleep(2000)" action, i.e. we sleep for the
Expand Down
7 changes: 3 additions & 4 deletions pageserver/src/tenant/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,9 @@ pub(crate) async fn concurrent_background_tasks_rate_limit_permit(
) -> tokio::sync::SemaphorePermit<'static> {
let _guard = crate::metrics::BACKGROUND_LOOP_SEMAPHORE.measure_acquisition(loop_kind);

pausable_failpoint!(
"initial-size-calculation-permit-pause",
loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation
);
if loop_kind == BackgroundLoopKind::InitialLogicalSizeCalculation {
pausable_failpoint!("initial-size-calculation-permit-pause");
}

// TODO: assert that we run on BACKGROUND_RUNTIME; requires tokio_unstable Handle::id();
match CONCURRENT_BACKGROUND_TASKS.acquire().await {
Expand Down
67 changes: 24 additions & 43 deletions pageserver/src/tenant/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use utils::{
use wal_decoder::serialized_batch::{SerializedValueBatch, ValueMeta};

use std::sync::atomic::Ordering as AtomicOrdering;
use std::sync::OnceLock;
use std::sync::{Arc, Mutex, RwLock, Weak};
use std::time::{Duration, Instant, SystemTime};
use std::{
Expand All @@ -72,7 +73,6 @@ use std::{
collections::btree_map::Entry,
ops::{Deref, Range},
};
use std::{pin::pin, sync::OnceLock};

use crate::{
aux_file::AuxFileSizeEstimator,
Expand Down Expand Up @@ -2804,12 +2804,10 @@ impl Timeline {
"initial size calculation",
// NB: don't log errors here, task_mgr will do that.
async move {
let cancel = task_mgr::shutdown_token();
self_clone
.initial_logical_size_calculation_task(
initial_part_end,
cancel_wait_for_background_loop_concurrency_limit_semaphore,
cancel,
background_ctx,
)
.await;
Expand All @@ -2819,11 +2817,21 @@ impl Timeline {
);
}

/// # Cancellation
///
/// This method is sensitive to `Timeline::cancel`.
///
/// It is _not_ sensitive to task_mgr::shutdown_token().
///
/// # Cancel-Safety
///
/// It does Timeline IO, hence this should be polled to completion because
/// we could be leaving in-flight IOs behind, which is safe, but annoying
/// to reason about.
async fn initial_logical_size_calculation_task(
self: Arc<Self>,
initial_part_end: Lsn,
skip_concurrency_limiter: CancellationToken,
cancel: CancellationToken,
background_ctx: RequestContext,
) {
scopeguard::defer! {
Expand All @@ -2836,7 +2844,6 @@ impl Timeline {
let self_ref = &self;
let skip_concurrency_limiter = &skip_concurrency_limiter;
async move {
let cancel = task_mgr::shutdown_token();
let wait_for_permit = super::tasks::concurrent_background_tasks_rate_limit_permit(
BackgroundLoopKind::InitialLogicalSizeCalculation,
background_ctx,
Expand All @@ -2850,9 +2857,6 @@ impl Timeline {
_ = self_ref.cancel.cancelled() => {
return Err(CalculateLogicalSizeError::Cancelled);
}
_ = cancel.cancelled() => {
return Err(CalculateLogicalSizeError::Cancelled);
},
() = skip_concurrency_limiter.cancelled() => {
// Some action that is part of a end user interaction requested logical size
// => break out of the rate limit
Expand Down Expand Up @@ -2911,22 +2915,18 @@ impl Timeline {
)
.expect("10min < 1hour"),
);
tokio::time::sleep(sleep_duration).await;
tokio::select! {
_ = tokio::time::sleep(sleep_duration) => {}
_ = self.cancel.cancelled() => return ControlFlow::Break(()),
}
}
}
}
};

let (calculated_size, metrics_guard) = tokio::select! {
res = retrying => {
match res {
ControlFlow::Continue(calculated_size) => calculated_size,
ControlFlow::Break(()) => return,
}
}
_ = cancel.cancelled() => {
return;
}
let (calculated_size, metrics_guard) = match retrying.await {
ControlFlow::Continue(calculated_size) => calculated_size,
ControlFlow::Break(()) => return,
};

// we cannot query current_logical_size.current_size() to know the current
Expand Down Expand Up @@ -2982,9 +2982,6 @@ impl Timeline {
receiver
}

/// # Cancel-Safety
///
/// This method is cancellation-safe.
#[instrument(skip_all)]
async fn logical_size_calculation_task(
self: &Arc<Self>,
Expand All @@ -3002,32 +2999,13 @@ impl Timeline {
.enter()
.map_err(|_| CalculateLogicalSizeError::Cancelled)?;

let self_calculation = Arc::clone(self);

let mut calculation = pin!(async {
let ctx = ctx.attached_child();
self_calculation
.calculate_logical_size(lsn, cause, &guard, &ctx)
.await
});

tokio::select! {
res = &mut calculation => { res }
_ = self.cancel.cancelled() => {
debug!("cancelling logical size calculation for timeline shutdown");
calculation.await
}
}
self.calculate_logical_size(lsn, cause, &guard, ctx).await
}

/// Calculate the logical size of the database at the latest LSN.
///
/// NOTE: counted incrementally, includes ancestors. This can be a slow operation,
/// especially if we need to download remote layers.
///
/// # Cancel-Safety
///
/// This method is cancellation-safe.
async fn calculate_logical_size(
&self,
up_to_lsn: Lsn,
Expand All @@ -3040,7 +3018,10 @@ impl Timeline {
self.timeline_id, up_to_lsn
);

pausable_failpoint!("timeline-calculate-logical-size-pause");
if let Err(()) = pausable_failpoint!("timeline-calculate-logical-size-pause", &self.cancel)
{
return Err(CalculateLogicalSizeError::Cancelled);
}

// See if we've already done the work for initial size calculation.
// This is a short-cut for timelines that are mostly unused.
Expand Down

1 comment on commit b31ce14

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7521 tests run: 7124 passed, 0 failed, 397 skipped (full report)


Flaky tests (6)

Postgres 17

Postgres 16

Postgres 14

  • test_timeline_ancestor_detach_idempotent_success[shards_initial_after2]: release-x86-64

Code coverage* (full report)

  • functions: 33.5% (8462 of 25279 functions)
  • lines: 49.2% (70946 of 144156 lines)

* collected from Rust tests only


The comment gets automatically updated with the latest test results
b31ce14 at 2025-01-22T14:53:04.943Z :recycle:

Please sign in to comment.