Skip to content

Commit

Permalink
pageserver: add LayerMap::watch_layer0_deltas() (#10470)
Browse files Browse the repository at this point in the history
## Problem

For compaction backpressure, we need a mechanism to signal when
compaction has reduced the L0 delta layer count below the backpressure
threshold.

Extracted from #10405.

## Summary of changes

Add `LayerMap::watch_level0_deltas()` which returns a
`tokio::sync::watch::Receiver` signalling the current L0 delta layer
count.
  • Loading branch information
erikgrinaker authored Jan 21, 2025
1 parent a75e11c commit 8a8c656
Showing 1 changed file with 28 additions and 1 deletion.
29 changes: 28 additions & 1 deletion pageserver/src/tenant/layer_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use std::collections::{HashMap, VecDeque};
use std::iter::Peekable;
use std::ops::Range;
use std::sync::Arc;
use tokio::sync::watch;
use utils::lsn::Lsn;

use historic_layer_coverage::BufferedHistoricLayerCoverage;
Expand All @@ -67,7 +68,6 @@ use super::storage_layer::{LayerVisibilityHint, PersistentLayerDesc};
///
/// LayerMap tracks what layers exist on a timeline.
///
#[derive(Default)]
pub struct LayerMap {
//
// 'open_layer' holds the current InMemoryLayer that is accepting new
Expand All @@ -93,7 +93,25 @@ pub struct LayerMap {

/// L0 layers have key range Key::MIN..Key::MAX, and locating them using R-Tree search is very inefficient.
/// So L0 layers are held in l0_delta_layers vector, in addition to the R-tree.
///
/// NB: make sure to notify `watch_l0_deltas` on changes.
l0_delta_layers: Vec<Arc<PersistentLayerDesc>>,

/// Notifies about L0 delta layer changes, sending the current number of L0 layers.
watch_l0_deltas: watch::Sender<usize>,
}

impl Default for LayerMap {
fn default() -> Self {
Self {
open_layer: Default::default(),
next_open_layer_at: Default::default(),
frozen_layers: Default::default(),
historic: Default::default(),
l0_delta_layers: Default::default(),
watch_l0_deltas: watch::channel(0).0,
}
}
}

/// The primary update API for the layer map.
Expand Down Expand Up @@ -466,6 +484,8 @@ impl LayerMap {

if Self::is_l0(&layer_desc.key_range, layer_desc.is_delta) {
self.l0_delta_layers.push(layer_desc.clone().into());
self.watch_l0_deltas
.send_replace(self.l0_delta_layers.len());
}

self.historic.insert(
Expand All @@ -488,6 +508,8 @@ impl LayerMap {
let mut l0_delta_layers = std::mem::take(&mut self.l0_delta_layers);
l0_delta_layers.retain(|other| other.key() != layer_key);
self.l0_delta_layers = l0_delta_layers;
self.watch_l0_deltas
.send_replace(self.l0_delta_layers.len());
// this assertion is related to use of Arc::ptr_eq in Self::compare_arced_layers,
// there's a chance that the comparison fails at runtime due to it comparing (pointer,
// vtable) pairs.
Expand Down Expand Up @@ -850,6 +872,11 @@ impl LayerMap {
&self.l0_delta_layers
}

/// Subscribes to L0 delta layer changes, sending the current number of L0 delta layers.
pub fn watch_level0_deltas(&self) -> watch::Receiver<usize> {
self.watch_l0_deltas.subscribe()
}

/// debugging function to print out the contents of the layer map
#[allow(unused)]
pub async fn dump(&self, verbose: bool, ctx: &RequestContext) -> Result<()> {
Expand Down

1 comment on commit 8a8c656

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

7370 tests run: 6982 passed, 2 failed, 386 skipped (full report)


Failures on Postgres 15

Failures on Postgres 14

# Run all failed tests locally:
scripts/pytest -vv -n $(nproc) -k "test_scrubber_physical_gc_ancestors[release-pg14-2] or test_scrubber_physical_gc_ancestors[release-pg15-2]"
Flaky tests (3)

Postgres 17

Test coverage report is not available

The comment gets automatically updated with the latest test results
8a8c656 at 2025-01-21T22:06:55.808Z :recycle:

Please sign in to comment.