Skip to content

Commit

Permalink
refactor timeline init, add conf to python
Browse files Browse the repository at this point in the history
  • Loading branch information
arssher committed Dec 25, 2024
1 parent c04842a commit f552684
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 59 deletions.
4 changes: 3 additions & 1 deletion libs/safekeeper_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ pub mod models;

/// Consensus logical timestamp. Note: it is a part of sk control file.
pub type Term = u64;
pub const INVALID_TERM: Term = 0;
/// With this term timeline is created initially. It
/// is a normal term except wp is never elected with it.
pub const INITIAL_TERM: Term = 0;

/// Information about Postgres. Safekeeper gets it once and then verifies all
/// further connections from computes match. Note: it is a part of sk control
Expand Down
9 changes: 6 additions & 3 deletions libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@ pub struct TimelineCreateRequest {
pub mconf: Configuration,
pub pg_version: u32,
pub system_id: Option<u64>,
// By default WAL_SEGMENT_SIZE
pub wal_seg_size: Option<u32>,
pub commit_lsn: Lsn,
// If not passed, it is assigned to the beginning of commit_lsn segment.
pub local_start_lsn: Option<Lsn>,
pub start_lsn: Lsn,
// Normal creation should omit this field (start_lsn initializes all LSNs).
// However, we allow specifying custom value higher than start_lsn for
// manual recovery case, see test_s3_wal_replay.
pub commit_lsn: Option<Lsn>,
}

/// Same as TermLsn, but serializes LSN using display serializer
Expand Down
4 changes: 2 additions & 2 deletions safekeeper/src/control_file_upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl PersistedPeerInfo {
pub fn new() -> Self {
Self {
backup_lsn: Lsn::INVALID,
term: safekeeper_api::INVALID_TERM,
term: safekeeper_api::INITIAL_TERM,
flush_lsn: Lsn(0),
commit_lsn: Lsn(0),
}
Expand Down Expand Up @@ -515,7 +515,7 @@ pub fn upgrade_control_file(buf: &[u8], version: u32) -> Result<TimelinePersiste
peer_horizon_lsn: oldstate.peer_horizon_lsn,
remote_consistent_lsn: oldstate.remote_consistent_lsn,
partial_backup: oldstate.partial_backup,
eviction_state: EvictionState::Present,
eviction_state: oldstate.eviction_state,
creation_ts: std::time::SystemTime::UNIX_EPOCH,
});
}
Expand Down
2 changes: 1 addition & 1 deletion safekeeper/src/copy_timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ pub async fn handle_request(
&request.destination_ttid,
Configuration::empty(),
state.server.clone(),
request.until_lsn,
start_lsn,
request.until_lsn,
)?;
new_state.timeline_start_lsn = start_lsn;
new_state.peer_horizon_lsn = request.until_lsn;
Expand Down
9 changes: 2 additions & 7 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,19 +111,14 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
system_id: request_data.system_id.unwrap_or(0),
wal_seg_size: request_data.wal_seg_size.unwrap_or(WAL_SEGMENT_SIZE as u32),
};
let local_start_lsn = request_data.local_start_lsn.unwrap_or_else(|| {
request_data
.commit_lsn
.segment_lsn(server_info.wal_seg_size as usize)
});
let global_timelines = get_global_timelines(&request);
global_timelines
.create(
ttid,
request_data.mconf,
server_info,
request_data.commit_lsn,
local_start_lsn,
request_data.start_lsn,
request_data.commit_lsn.unwrap_or(request_data.start_lsn),
)
.await
.map_err(ApiError::InternalServerError)?;
Expand Down
4 changes: 1 addition & 3 deletions safekeeper/src/safekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use bytes::{Buf, BufMut, Bytes, BytesMut};
use postgres_ffi::{TimeLineID, MAX_SEND_SIZE};
use safekeeper_api::models::HotStandbyFeedback;
use safekeeper_api::Term;
use safekeeper_api::INVALID_TERM;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use std::cmp::min;
Expand Down Expand Up @@ -980,7 +979,7 @@ where

/// Update commit_lsn from peer safekeeper data.
pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> {
if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) {
if Lsn(sk_info.commit_lsn) != Lsn::INVALID {
// Note: the check is too restrictive, generally we can update local
// commit_lsn if our history matches (is part of) history of advanced
// commit_lsn provider.
Expand Down Expand Up @@ -1291,7 +1290,6 @@ mod tests {

#[test]
fn test_sk_state_bincode_serde_roundtrip() {
use utils::Hex;
let tenant_id = TenantId::from_str("cf0480929707ee75372337efaa5ecf96").unwrap();
let timeline_id = TimelineId::from_str("112ded66422aa5e953e5440fa5427ac4").unwrap();
let state = TimelinePersistentState {
Expand Down
40 changes: 28 additions & 12 deletions safekeeper/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{cmp::max, ops::Deref, time::SystemTime};
use anyhow::{bail, Result};
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::{
membership::Configuration, models::TimelineTermBumpResponse, ServerInfo, Term,
membership::Configuration, models::TimelineTermBumpResponse, ServerInfo, Term, INITIAL_TERM,
};
use serde::{Deserialize, Serialize};
use utils::{
Expand All @@ -16,7 +16,7 @@ use utils::{

use crate::{
control_file,
safekeeper::{AcceptorState, PgUuid, TermHistory, UNKNOWN_SERVER_VERSION},
safekeeper::{AcceptorState, PgUuid, TermHistory, TermLsn, UNKNOWN_SERVER_VERSION},
timeline::TimelineError,
wal_backup_partial::{self},
};
Expand Down Expand Up @@ -84,12 +84,14 @@ pub enum EvictionState {
}

impl TimelinePersistentState {
/// commit_lsn is the same as start_lsn in the normal creaiton; see
/// `TimelineCreateRequest` comments.`
pub fn new(
ttid: &TenantTimelineId,
mconf: Configuration,
server_info: ServerInfo,
start_lsn: Lsn,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> anyhow::Result<TimelinePersistentState> {
if server_info.wal_seg_size == 0 {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
Expand All @@ -99,29 +101,43 @@ impl TimelinePersistentState {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}

if commit_lsn < local_start_lsn {
if commit_lsn < start_lsn {
bail!(
"commit_lsn {} is smaller than local_start_lsn {}",
"commit_lsn {} is smaller than start_lsn {}",
commit_lsn,
local_start_lsn
start_lsn
);
}

// If we are given with init LSN, initialize term history with it. It
// ensures that walproposer always must be able to find a common point
// in histories; if it can't something is corrupted. Not having LSN here
// is so far left for legacy case where timeline is created by compute
// and LSN during creation is not known yet.
let term_history = if commit_lsn != Lsn::INVALID {
TermHistory(vec![TermLsn {
term: INITIAL_TERM,
lsn: start_lsn,
}])
} else {
TermHistory::empty()
};

Ok(TimelinePersistentState {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
mconf,
acceptor_state: AcceptorState {
term: 0,
term_history: TermHistory::empty(),
term: INITIAL_TERM,
term_history,
},
server: server_info,
proposer_uuid: [0; 16],
timeline_start_lsn: Lsn(0),
local_start_lsn,
timeline_start_lsn: start_lsn,
local_start_lsn: start_lsn,
commit_lsn,
backup_lsn: local_start_lsn,
peer_horizon_lsn: local_start_lsn,
backup_lsn: start_lsn,
peer_horizon_lsn: start_lsn,
remote_consistent_lsn: Lsn(0),
partial_backup: wal_backup_partial::State::default(),
eviction_state: EvictionState::Present,
Expand Down
5 changes: 2 additions & 3 deletions safekeeper/src/timelines_global_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ impl GlobalTimelines {
ttid: TenantTimelineId,
mconf: Configuration,
server_info: ServerInfo,
start_lsn: Lsn,
commit_lsn: Lsn,
local_start_lsn: Lsn,
) -> Result<Arc<Timeline>> {
let (conf, _, _) = {
let state = self.state.lock().unwrap();
Expand All @@ -241,8 +241,7 @@ impl GlobalTimelines {

// TODO: currently we create only cfile. It would be reasonable to
// immediately initialize first WAL segment as well.
let state =
TimelinePersistentState::new(&ttid, mconf, server_info, commit_lsn, local_start_lsn)?;
let state = TimelinePersistentState::new(&ttid, mconf, server_info, start_lsn, commit_lsn)?;
control_file::FileStorage::create_new(&tmp_dir_path, state, conf.no_sync).await?;
let timeline = self.load_temp_timeline(ttid, &tmp_dir_path, true).await?;
Ok(timeline)
Expand Down
11 changes: 1 addition & 10 deletions test_runner/fixtures/pageserver/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from urllib3.util.retry import Retry

from fixtures.common_types import (
Id,
Lsn,
TenantId,
TenantShardId,
Expand All @@ -25,7 +24,7 @@
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.pg_version import PgVersion
from fixtures.utils import Fn
from fixtures.utils import EnhancedJSONEncoder, Fn


class PageserverApiException(Exception):
Expand Down Expand Up @@ -83,14 +82,6 @@ class TimelineCreateRequest:
mode: TimelineCreateRequestMode

def to_json(self) -> str:
class EnhancedJSONEncoder(json.JSONEncoder):
def default(self, o):
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
elif isinstance(o, Id):
return o.id.hex()
return super().default(o)

# mode is flattened
this = dataclasses.asdict(self)
mode = this.pop("mode")
Expand Down
46 changes: 31 additions & 15 deletions test_runner/fixtures/safekeeper/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from fixtures.common_types import Lsn, TenantId, TenantTimelineId, TimelineId
from fixtures.log_helper import log
from fixtures.metrics import Metrics, MetricsGetter, parse_metrics
from fixtures.utils import wait_until
from fixtures.utils import EnhancedJSONEncoder, wait_until

if TYPE_CHECKING:
from typing import Any
Expand Down Expand Up @@ -69,6 +69,34 @@ def from_json(cls, d: dict[str, Any]) -> TermBumpResponse:
)


@dataclass
class SafekeeperId:
id: int
host: str
pg_port: str


@dataclass
class Configuration:
generation: int
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None


@dataclass
class TimelineCreateRequest:
tenant_id: TenantId
timeline_id: TimelineId
mconf: Configuration
# not exactly PgVersion, for example 150002 for 15.2
pg_version: int
start_lsn: Lsn
commit_lsn: Lsn | None

def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)


class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError

Expand Down Expand Up @@ -131,20 +159,8 @@ def timeline_list(self) -> list[TenantTimelineId]:
resj = res.json()
return [TenantTimelineId.from_json(ttidj) for ttidj in resj]

def timeline_create(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
pg_version: int, # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
commit_lsn: Lsn,
):
body = {
"tenant_id": str(tenant_id),
"timeline_id": str(timeline_id),
"pg_version": pg_version,
"commit_lsn": str(commit_lsn),
}
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", json=body)
def timeline_create(self, r: TimelineCreateRequest):
res = self.post(f"http://localhost:{self.port}/v1/tenant/timeline", data=r.to_json())
res.raise_for_status()

def timeline_status(
Expand Down
18 changes: 18 additions & 0 deletions test_runner/fixtures/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import contextlib
import dataclasses
import json
import os
import re
Expand All @@ -21,6 +22,7 @@
from psycopg2.extensions import cursor
from typing_extensions import override

from fixtures.common_types import Id, Lsn
from fixtures.log_helper import log
from fixtures.pageserver.common_types import (
parse_delta_layer,
Expand Down Expand Up @@ -605,6 +607,22 @@ def join(self, timeout: float | None = None) -> Any:
return self.ret


class EnhancedJSONEncoder(json.JSONEncoder):
"""
Default json.JSONEncoder works only on primitive builtins. Extend it to any
dataclass plus our custom types.
"""

def default(self, o):
if dataclasses.is_dataclass(o) and not isinstance(o, type):
return dataclasses.asdict(o)
elif isinstance(o, Id):
return o.id.hex()
elif isinstance(o, Lsn):
return str(o) # standard hex notation
return super().default(o)


def human_bytes(amt: float) -> str:
"""
Render a bytes amount into nice IEC bytes string.
Expand Down
10 changes: 8 additions & 2 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
default_remote_storage,
s3_storage,
)
from fixtures.safekeeper.http import SafekeeperHttpClient
from fixtures.safekeeper.http import Configuration, SafekeeperHttpClient, TimelineCreateRequest
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
PropagatingThread,
Expand Down Expand Up @@ -658,7 +658,13 @@ def test_s3_wal_replay(neon_env_builder: NeonEnvBuilder):
for sk in env.safekeepers:
sk.start()
cli = sk.http_client()
cli.timeline_create(tenant_id, timeline_id, pg_version, last_lsn)
mconf = Configuration(generation=0, members=[], new_members=None)
# set start_lsn to the beginning of the first segment to allow reading
# WAL from there (could you intidb LSN as well).
r = TimelineCreateRequest(
tenant_id, timeline_id, mconf, pg_version, Lsn("0/1000000"), commit_lsn=last_lsn
)
cli.timeline_create(r)
f_partial_path = (
Path(sk.data_dir) / str(tenant_id) / str(timeline_id) / f_partial_saved.name
)
Expand Down

0 comments on commit f552684

Please sign in to comment.