diff --git a/libs/safekeeper_api/src/membership.rs b/libs/safekeeper_api/src/membership.rs index fe302045451f..a39fda526f40 100644 --- a/libs/safekeeper_api/src/membership.rs +++ b/libs/safekeeper_api/src/membership.rs @@ -23,6 +23,8 @@ pub const INITIAL_GENERATION: Generation = 1; pub struct SafekeeperId { pub id: NodeId, pub host: String, + /// We include here only port for computes -- that is, pg protocol tenant + /// only port, or wide pg protocol port if the former is not configured. pub pg_port: u16, } diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index ad38986357fb..a6f90154f432 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -175,6 +175,7 @@ pub enum WalReceiverStatus { pub struct TimelineStatus { pub tenant_id: TenantId, pub timeline_id: TimelineId, + pub mconf: Configuration, pub acceptor_state: AcceptorStateStatus, pub pg_info: ServerInfo, pub flush_lsn: Lsn, @@ -189,6 +190,20 @@ pub struct TimelineStatus { pub walreceivers: Vec, } +/// Request to switch membership configuration. +#[derive(Serialize, Deserialize)] +#[serde(transparent)] +pub struct TimelineMembershipSwitchRequest { + pub mconf: Configuration, +} + +/// In response both previous and current configuration are sent. +#[derive(Serialize, Deserialize)] +pub struct TimelineMembershipSwitchResponse { + pub previous_conf: Configuration, + pub current_conf: Configuration, +} + fn lsn_invalid() -> Lsn { Lsn::INVALID } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 3835d396987f..5ecde4b12561 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -1,4 +1,5 @@ use hyper::{Body, Request, Response, StatusCode}; +use safekeeper_api::models; use safekeeper_api::models::AcceptorStateStatus; use safekeeper_api::models::SafekeeperStatus; use safekeeper_api::models::TermSwitchApiEntry; @@ -183,6 +184,7 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result, +) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let global_timelines = get_global_timelines(&request); + let tli = global_timelines.get(ttid).map_err(ApiError::from)?; + + let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?; + let response = tli + .membership_switch(data.mconf) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} + async fn timeline_copy_handler(mut request: Request) -> Result, ApiError> { check_permission(&request, None)?; @@ -619,6 +643,10 @@ pub fn make_router( "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id", |r| request_span(r, timeline_snapshot_handler), ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/membership", + |r| request_span(r, timeline_membership_handler), + ) .post( "/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy", |r| request_span(r, timeline_copy_handler), diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index 1c3bb1b4dc0a..4d566b12a08e 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -6,9 +6,12 @@ use std::{cmp::max, ops::Deref, time::SystemTime}; use anyhow::{bail, Result}; use postgres_ffi::WAL_SEGMENT_SIZE; use safekeeper_api::{ - membership::Configuration, models::TimelineTermBumpResponse, ServerInfo, Term, INITIAL_TERM, + membership::Configuration, + models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse}, + ServerInfo, Term, INITIAL_TERM, }; use serde::{Deserialize, Serialize}; +use tracing::info; use utils::{ id::{TenantId, TenantTimelineId, TimelineId}, lsn::Lsn, @@ -258,6 +261,31 @@ where current_term: after, }) } + + /// Switch into membership configuration `to` if it is higher than the + /// current one. + pub async fn membership_switch( + &mut self, + to: Configuration, + ) -> Result { + let before = self.mconf.clone(); + // Is switch allowed? + if to.generation <= self.mconf.generation { + info!( + "ignoring request to switch membership conf to lower {}, current conf {}", + to, self.mconf + ); + } else { + let mut state = self.start_change(); + state.mconf = to.clone(); + self.finish_change(&state).await?; + info!("switched membership conf to {} from {}", to, before); + } + Ok(TimelineMembershipSwitchResponse { + previous_conf: before, + current_conf: self.mconf.clone(), + }) + } } impl Deref for TimelineState diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 36860a0da2b4..288239107444 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,7 +4,10 @@ use anyhow::{anyhow, bail, Result}; use camino::{Utf8Path, Utf8PathBuf}; use remote_storage::RemotePath; -use safekeeper_api::models::{PeerInfo, TimelineTermBumpResponse}; +use safekeeper_api::membership::Configuration; +use safekeeper_api::models::{ + PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse, +}; use safekeeper_api::Term; use tokio::fs::{self}; use tokio_util::sync::CancellationToken; @@ -188,6 +191,13 @@ impl StateSK { self.state_mut().term_bump(to).await } + pub async fn membership_switch( + &mut self, + to: Configuration, + ) -> Result { + self.state_mut().membership_switch(to).await + } + /// Close open WAL files to release FDs. fn close_wal_store(&mut self) { if let StateSK::Loaded(sk) = self { @@ -768,6 +778,14 @@ impl Timeline { state.sk.term_bump(to).await } + pub async fn membership_switch( + self: &Arc, + to: Configuration, + ) -> Result { + let mut state = self.write_shared_state().await; + state.sk.membership_switch(to).await + } + /// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`] async fn do_wal_residence_guard( self: &Arc, diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 4826cae3ee13..493ce7334e2a 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -25,6 +25,7 @@ class Walreceiver: @dataclass class SafekeeperTimelineStatus: + mconf: Configuration | None term: int last_log_term: int pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2 @@ -73,7 +74,7 @@ def from_json(cls, d: dict[str, Any]) -> TermBumpResponse: class SafekeeperId: id: int host: str - pg_port: str + pg_port: int @dataclass @@ -82,6 +83,16 @@ class Configuration: members: list[SafekeeperId] new_members: list[SafekeeperId] | None + @classmethod + def from_json(cls, d: dict[str, Any]) -> Configuration: + generation = d["generation"] + members = d["members"] + new_members = d.get("new_members") + return Configuration(generation, members, new_members) + + def to_json(self) -> str: + return json.dumps(self, cls=EnhancedJSONEncoder) + @dataclass class TimelineCreateRequest: @@ -97,6 +108,18 @@ def to_json(self) -> str: return json.dumps(self, cls=EnhancedJSONEncoder) +@dataclass +class TimelineMembershipSwitchResponse: + previous_conf: Configuration + current_conf: Configuration + + @classmethod + def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse: + previous_conf = Configuration.from_json(d["previous_conf"]) + current_conf = Configuration.from_json(d["current_conf"]) + return TimelineMembershipSwitchResponse(previous_conf, current_conf) + + class SafekeeperHttpClient(requests.Session, MetricsGetter): HTTPError = requests.HTTPError @@ -170,7 +193,10 @@ def timeline_status( res.raise_for_status() resj = res.json() walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]] + # It is always normally not None, it is allowed only to make forward compat tests happy. + mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None return SafekeeperTimelineStatus( + mconf=mconf, term=resj["acceptor_state"]["term"], last_log_term=resj["acceptor_state"]["epoch"], pg_version=resj["pg_info"]["pg_version"], @@ -196,6 +222,11 @@ def timeline_start_lsn_non_zero() -> Lsn: def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn: return self.timeline_status(tenant_id, timeline_id).commit_lsn + # Get timeline membership configuration. + def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration: + # make mypy happy + return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore + # only_local doesn't remove segments in the remote storage. def timeline_delete( self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False @@ -242,6 +273,16 @@ def pull_timeline(self, body: dict[str, Any]) -> dict[str, Any]: assert isinstance(res_json, dict) return res_json + def membership_switch( + self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration + ) -> TimelineMembershipSwitchResponse: + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership", + data=to.to_json(), + ) + res.raise_for_status() + return TimelineMembershipSwitchResponse.from_json(res.json()) + def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: dict[str, Any]): res = self.post( f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy", diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index d39c6a6b5bef..2b6a267bdf8e 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -48,7 +48,12 @@ default_remote_storage, s3_storage, ) -from fixtures.safekeeper.http import Configuration, SafekeeperHttpClient, TimelineCreateRequest +from fixtures.safekeeper.http import ( + Configuration, + SafekeeperHttpClient, + SafekeeperId, + TimelineCreateRequest, +) from fixtures.safekeeper.utils import wait_walreceivers_absent from fixtures.utils import ( PropagatingThread, @@ -2243,6 +2248,63 @@ def unevicted_on_dest(): wait_until(unevicted_on_dest, interval=0.1, timeout=1.0) +# Basic test for http API membership related calls: create timeline and switch +# configuration. Normally these are called by storage controller, but this +# allows to test them separately. +@run_only_on_default_postgres("tests only safekeeper API") +def test_membership_api(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + sk = env.safekeepers[0] + http_cli = sk.http_client() + + sk_id_1 = SafekeeperId(env.safekeepers[0].id, "localhost", sk.port.pg_tenant_only) + sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock + + # Request to switch before timeline creation should fail. + init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None) + with pytest.raises(requests.exceptions.HTTPError): + http_cli.membership_switch(tenant_id, timeline_id, init_conf) + + # Create timeline. + create_r = TimelineCreateRequest( + tenant_id, timeline_id, init_conf, 150002, Lsn("0/1000000"), commit_lsn=None + ) + log.info(f"sending {create_r.to_json()}") + http_cli.timeline_create(create_r) + + # Switch into some conf. + joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2]) + resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf) + log.info(f"joint switch resp: {resp}") + assert resp.previous_conf.generation == 1 + assert resp.current_conf.generation == 4 + + # Restart sk, conf should be preserved. + sk.stop().start() + after_restart = http_cli.get_membership(tenant_id, timeline_id) + log.info(f"conf after restart: {after_restart}") + assert after_restart.generation == 4 + + # Switch into disjoint conf. + non_joint = Configuration(generation=5, members=[sk_id_2], new_members=None) + resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint) + log.info(f"non joint switch resp: {resp}") + assert resp.previous_conf.generation == 4 + assert resp.current_conf.generation == 5 + + # Switch request to lower conf should be ignored. + lower_conf = Configuration(generation=3, members=[], new_members=None) + resp = http_cli.membership_switch(tenant_id, timeline_id, lower_conf) + log.info(f"lower switch resp: {resp}") + assert resp.previous_conf.generation == 5 + assert resp.current_conf.generation == 5 + + # In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries # when compute is active, but there are no writes to the timeline. In that case # pageserver should maintain a single connection to safekeeper and don't attempt