Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pageserver: tighten up code around SLRU dir key handling #10082

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions libs/pageserver_api/src/key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,10 @@ impl Key {
&& self.field5 == 0
&& self.field6 == u32::MAX
}

pub fn is_slru_dir_key(&self) -> bool {
slru_dir_kind(self).is_some()
}
}

#[inline(always)]
Expand Down
6 changes: 5 additions & 1 deletion libs/pageserver_api/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ impl ShardIdentity {

/// Return true if the key should be stored on all shards, not just one.
pub fn is_key_global(&self, key: &Key) -> bool {
if key.is_slru_block_key() || key.is_slru_segment_size_key() || key.is_aux_file_key() {
if key.is_slru_block_key()
|| key.is_slru_segment_size_key()
|| key.is_aux_file_key()
|| key.is_slru_dir_key()
{
// Special keys that are only stored on shard 0
false
} else if key.is_rel_block_key() {
Expand Down
29 changes: 17 additions & 12 deletions pageserver/src/pgdatadir_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1319,18 +1319,23 @@ impl<'a> DatadirModification<'a> {

let buf: Bytes = SlruSegmentDirectory::ser(&SlruSegmentDirectory::default())?.into();
let empty_dir = Value::Image(buf);
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(
slru_dir_to_key(SlruKind::MultiXactMembers),
empty_dir.clone(),
);
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));

// Initialize SLRUs on shard 0 only: creating these on other shards would be
// harmless but they'd just be dropped on later compaction.
if self.tline.tenant_shard_id.is_shard_zero() {
self.put(slru_dir_to_key(SlruKind::Clog), empty_dir.clone());
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(
slru_dir_to_key(SlruKind::MultiXactMembers),
empty_dir.clone(),
);
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::Clog), 0));
self.put(slru_dir_to_key(SlruKind::MultiXactOffsets), empty_dir);
self.pending_directory_entries
.push((DirectoryKind::SlruSegment(SlruKind::MultiXactOffsets), 0));
}

Ok(())
}
Expand Down
30 changes: 16 additions & 14 deletions pageserver/src/walingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -877,22 +877,24 @@ impl WalIngest {
// will block waiting for the last valid LSN to advance up to
// it. So we use the previous record's LSN in the get calls
// instead.
for segno in modification
.tline
.list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
.await?
{
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;
if modification.tline.get_shard_identity().is_shard_zero() {
for segno in modification
.tline
.list_slru_segments(SlruKind::Clog, Version::Modified(modification), ctx)
.await?
{
let segpage = segno * pg_constants::SLRU_PAGES_PER_SEGMENT;

let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
});
let may_delete = dispatch_pgversion!(modification.tline.pg_version, {
pgv::nonrelfile_utils::slru_may_delete_clogsegment(segpage, pageno)
});

if may_delete {
modification
.drop_slru_segment(SlruKind::Clog, segno, ctx)
.await?;
trace!("Drop CLOG segment {:>04X}", segno);
if may_delete {
modification
.drop_slru_segment(SlruKind::Clog, segno, ctx)
.await?;
trace!("Drop CLOG segment {:>04X}", segno);
}
}
}

Expand Down
24 changes: 16 additions & 8 deletions test_runner/regress/test_clog_truncate.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from __future__ import annotations

import os
import time

from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnv
from fixtures.utils import query_scalar
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import query_scalar, wait_until


#
# Test compute node start after clog truncation
#
def test_clog_truncate(neon_simple_env: NeonEnv):
env = neon_simple_env
def test_clog_truncate(neon_env_builder: NeonEnvBuilder):
# Use a multi-sharded tenant because WAL ingest logic is shard-dependent, and
# this test is one of the very few that exercises a CLogTruncate WAL record.
env = neon_env_builder.init_start(initial_tenant_shard_count=2)

# set aggressive autovacuum to make sure that truncation will happen
config = [
Expand All @@ -31,6 +32,7 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
endpoint.safe_psql("CREATE EXTENSION neon_test_utils")

# Consume many xids to advance clog
log.info("Consuming xids...")
with endpoint.cursor() as cur:
cur.execute("select test_consume_xids(1000*1000*10);")
log.info("xids consumed")
Expand All @@ -47,11 +49,17 @@ def test_clog_truncate(neon_simple_env: NeonEnv):
pg_xact_0000_path = os.path.join(endpoint.pg_xact_dir_path(), "0000")
log.info(f"pg_xact_0000_path = {pg_xact_0000_path}")

while os.path.isfile(pg_xact_0000_path):
log.info(f"file exists. wait for truncation: {pg_xact_0000_path=}")
time.sleep(5)
def assert_file_removed():
exists = os.path.isfile(pg_xact_0000_path)
if exists:
log.info(f"file exists. wait for truncation: {pg_xact_0000_path=}")
assert not exists

log.info("Waiting for truncation...")
wait_until(assert_file_removed)

# checkpoint to advance latest lsn
log.info("Checkpointing...")
with endpoint.cursor() as cur:
cur.execute("CHECKPOINT;")
lsn_after_truncation = query_scalar(cur, "select pg_current_wal_insert_lsn()")
Expand Down
Loading