Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

safekeeper: add membership configuration switch endpoint #10241

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libs/safekeeper_api/src/membership.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub const INITIAL_GENERATION: Generation = 1;
pub struct SafekeeperId {
pub id: NodeId,
pub host: String,
/// We include here only port for computes -- that is, pg protocol tenant
/// only port, or wide pg protocol port if the former is not configured.
pub pg_port: u16,
}

Expand Down
15 changes: 15 additions & 0 deletions libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub enum WalReceiverStatus {
pub struct TimelineStatus {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub mconf: Configuration,
pub acceptor_state: AcceptorStateStatus,
pub pg_info: ServerInfo,
pub flush_lsn: Lsn,
Expand All @@ -189,6 +190,20 @@ pub struct TimelineStatus {
pub walreceivers: Vec<WalReceiverState>,
}

/// Request to switch membership configuration.
#[derive(Serialize, Deserialize)]
#[serde(transparent)]
pub struct TimelineMembershipSwitchRequest {
pub mconf: Configuration,
}

/// In response both previous and current configuration are sent.
#[derive(Serialize, Deserialize)]
pub struct TimelineMembershipSwitchResponse {
pub previous_conf: Configuration,
pub current_conf: Configuration,
}

fn lsn_invalid() -> Lsn {
Lsn::INVALID
}
Expand Down
28 changes: 28 additions & 0 deletions safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use hyper::{Body, Request, Response, StatusCode};
use safekeeper_api::models;
use safekeeper_api::models::AcceptorStateStatus;
use safekeeper_api::models::SafekeeperStatus;
use safekeeper_api::models::TermSwitchApiEntry;
Expand Down Expand Up @@ -183,6 +184,7 @@ async fn timeline_status_handler(request: Request<Body>) -> Result<Response<Body
let status = TimelineStatus {
tenant_id: ttid.tenant_id,
timeline_id: ttid.timeline_id,
mconf: state.mconf,
acceptor_state: acc_state,
pg_info: state.server,
flush_lsn,
Expand Down Expand Up @@ -268,6 +270,28 @@ async fn timeline_snapshot_handler(request: Request<Body>) -> Result<Response<Bo
Ok(response)
}

/// Consider switching timeline membership configuration to the provided one.
async fn timeline_membership_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;

let global_timelines = get_global_timelines(&request);
let tli = global_timelines.get(ttid).map_err(ApiError::from)?;

let data: models::TimelineMembershipSwitchRequest = json_request(&mut request).await?;
let response = tli
.membership_switch(data.mconf)
.await
.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, response)
}

async fn timeline_copy_handler(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
check_permission(&request, None)?;

Expand Down Expand Up @@ -619,6 +643,10 @@ pub fn make_router(
"/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id",
|r| request_span(r, timeline_snapshot_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/membership",
|r| request_span(r, timeline_membership_handler),
)
.post(
"/v1/tenant/:tenant_id/timeline/:source_timeline_id/copy",
|r| request_span(r, timeline_copy_handler),
Expand Down
30 changes: 29 additions & 1 deletion safekeeper/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use std::{cmp::max, ops::Deref, time::SystemTime};
use anyhow::{bail, Result};
use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::{
membership::Configuration, models::TimelineTermBumpResponse, ServerInfo, Term, INITIAL_TERM,
membership::Configuration,
models::{TimelineMembershipSwitchResponse, TimelineTermBumpResponse},
ServerInfo, Term, INITIAL_TERM,
};
use serde::{Deserialize, Serialize};
use tracing::info;
use utils::{
id::{TenantId, TenantTimelineId, TimelineId},
lsn::Lsn,
Expand Down Expand Up @@ -258,6 +261,31 @@ where
current_term: after,
})
}

/// Switch into membership configuration `to` if it is higher than the
/// current one.
pub async fn membership_switch(
&mut self,
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let before = self.mconf.clone();
// Is switch allowed?
if to.generation <= self.mconf.generation {
info!(
"ignoring request to switch membership conf to lower {}, current conf {}",
to, self.mconf
);
} else {
let mut state = self.start_change();
state.mconf = to.clone();
self.finish_change(&state).await?;
info!("switched membership conf to {} from {}", to, before);
}
Ok(TimelineMembershipSwitchResponse {
previous_conf: before,
current_conf: self.mconf.clone(),
})
}
}

impl<CTRL> Deref for TimelineState<CTRL>
Expand Down
20 changes: 19 additions & 1 deletion safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use anyhow::{anyhow, bail, Result};
use camino::{Utf8Path, Utf8PathBuf};
use remote_storage::RemotePath;
use safekeeper_api::models::{PeerInfo, TimelineTermBumpResponse};
use safekeeper_api::membership::Configuration;
use safekeeper_api::models::{
PeerInfo, TimelineMembershipSwitchResponse, TimelineTermBumpResponse,
};
use safekeeper_api::Term;
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -188,6 +191,13 @@ impl StateSK {
self.state_mut().term_bump(to).await
}

pub async fn membership_switch(
&mut self,
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
self.state_mut().membership_switch(to).await
}

/// Close open WAL files to release FDs.
fn close_wal_store(&mut self) {
if let StateSK::Loaded(sk) = self {
Expand Down Expand Up @@ -768,6 +778,14 @@ impl Timeline {
state.sk.term_bump(to).await
}

pub async fn membership_switch(
self: &Arc<Self>,
to: Configuration,
) -> Result<TimelineMembershipSwitchResponse> {
let mut state = self.write_shared_state().await;
state.sk.membership_switch(to).await
}

/// Guts of [`Self::wal_residence_guard`] and [`Self::try_wal_residence_guard`]
async fn do_wal_residence_guard(
self: &Arc<Self>,
Expand Down
43 changes: 42 additions & 1 deletion test_runner/fixtures/safekeeper/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Walreceiver:

@dataclass
class SafekeeperTimelineStatus:
mconf: Configuration | None
term: int
last_log_term: int
pg_version: int # Not exactly a PgVersion, safekeeper returns version as int, for example 150002 for 15.2
Expand Down Expand Up @@ -73,7 +74,7 @@ def from_json(cls, d: dict[str, Any]) -> TermBumpResponse:
class SafekeeperId:
id: int
host: str
pg_port: str
pg_port: int


@dataclass
Expand All @@ -82,6 +83,16 @@ class Configuration:
members: list[SafekeeperId]
new_members: list[SafekeeperId] | None

@classmethod
def from_json(cls, d: dict[str, Any]) -> Configuration:
generation = d["generation"]
members = d["members"]
new_members = d.get("new_members")
return Configuration(generation, members, new_members)

def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)


@dataclass
class TimelineCreateRequest:
Expand All @@ -97,6 +108,18 @@ def to_json(self) -> str:
return json.dumps(self, cls=EnhancedJSONEncoder)


@dataclass
class TimelineMembershipSwitchResponse:
previous_conf: Configuration
current_conf: Configuration

@classmethod
def from_json(cls, d: dict[str, Any]) -> TimelineMembershipSwitchResponse:
previous_conf = Configuration.from_json(d["previous_conf"])
current_conf = Configuration.from_json(d["current_conf"])
return TimelineMembershipSwitchResponse(previous_conf, current_conf)


class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError

Expand Down Expand Up @@ -170,7 +193,10 @@ def timeline_status(
res.raise_for_status()
resj = res.json()
walreceivers = [Walreceiver(wr["conn_id"], wr["status"]) for wr in resj["walreceivers"]]
# It is always normally not None, it is allowed only to make forward compat tests happy.
mconf = Configuration.from_json(resj["mconf"]) if "mconf" in resj else None
arpad-m marked this conversation as resolved.
Show resolved Hide resolved
return SafekeeperTimelineStatus(
mconf=mconf,
term=resj["acceptor_state"]["term"],
last_log_term=resj["acceptor_state"]["epoch"],
pg_version=resj["pg_info"]["pg_version"],
Expand All @@ -196,6 +222,11 @@ def timeline_start_lsn_non_zero() -> Lsn:
def get_commit_lsn(self, tenant_id: TenantId, timeline_id: TimelineId) -> Lsn:
return self.timeline_status(tenant_id, timeline_id).commit_lsn

# Get timeline membership configuration.
def get_membership(self, tenant_id: TenantId, timeline_id: TimelineId) -> Configuration:
# make mypy happy
return self.timeline_status(tenant_id, timeline_id).mconf # type: ignore

# only_local doesn't remove segments in the remote storage.
def timeline_delete(
self, tenant_id: TenantId, timeline_id: TimelineId, only_local: bool = False
Expand Down Expand Up @@ -242,6 +273,16 @@ def pull_timeline(self, body: dict[str, Any]) -> dict[str, Any]:
assert isinstance(res_json, dict)
return res_json

def membership_switch(
self, tenant_id: TenantId, timeline_id: TimelineId, to: Configuration
) -> TimelineMembershipSwitchResponse:
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/membership",
data=to.to_json(),
)
res.raise_for_status()
return TimelineMembershipSwitchResponse.from_json(res.json())

def copy_timeline(self, tenant_id: TenantId, timeline_id: TimelineId, body: dict[str, Any]):
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/copy",
Expand Down
64 changes: 63 additions & 1 deletion test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,12 @@
default_remote_storage,
s3_storage,
)
from fixtures.safekeeper.http import Configuration, SafekeeperHttpClient, TimelineCreateRequest
from fixtures.safekeeper.http import (
Configuration,
SafekeeperHttpClient,
SafekeeperId,
TimelineCreateRequest,
)
from fixtures.safekeeper.utils import wait_walreceivers_absent
from fixtures.utils import (
PropagatingThread,
Expand Down Expand Up @@ -2243,6 +2248,63 @@ def unevicted_on_dest():
wait_until(unevicted_on_dest, interval=0.1, timeout=1.0)


# Basic test for http API membership related calls: create timeline and switch
# configuration. Normally these are called by storage controller, but this
# allows to test them separately.
@run_only_on_default_postgres("tests only safekeeper API")
def test_membership_api(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

sk = env.safekeepers[0]
http_cli = sk.http_client()

sk_id_1 = SafekeeperId(env.safekeepers[0].id, "localhost", sk.port.pg_tenant_only)
sk_id_2 = SafekeeperId(11, "localhost", 5434) # just a mock

# Request to switch before timeline creation should fail.
init_conf = Configuration(generation=1, members=[sk_id_1], new_members=None)
with pytest.raises(requests.exceptions.HTTPError):
http_cli.membership_switch(tenant_id, timeline_id, init_conf)

# Create timeline.
create_r = TimelineCreateRequest(
tenant_id, timeline_id, init_conf, 150002, Lsn("0/1000000"), commit_lsn=None
)
log.info(f"sending {create_r.to_json()}")
http_cli.timeline_create(create_r)

# Switch into some conf.
joint_conf = Configuration(generation=4, members=[sk_id_1], new_members=[sk_id_2])
resp = http_cli.membership_switch(tenant_id, timeline_id, joint_conf)
log.info(f"joint switch resp: {resp}")
assert resp.previous_conf.generation == 1
assert resp.current_conf.generation == 4

# Restart sk, conf should be preserved.
sk.stop().start()
after_restart = http_cli.get_membership(tenant_id, timeline_id)
log.info(f"conf after restart: {after_restart}")
assert after_restart.generation == 4

# Switch into disjoint conf.
non_joint = Configuration(generation=5, members=[sk_id_2], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, non_joint)
log.info(f"non joint switch resp: {resp}")
assert resp.previous_conf.generation == 4
assert resp.current_conf.generation == 5

# Switch request to lower conf should be ignored.
lower_conf = Configuration(generation=3, members=[], new_members=None)
resp = http_cli.membership_switch(tenant_id, timeline_id, lower_conf)
log.info(f"lower switch resp: {resp}")
assert resp.previous_conf.generation == 5
assert resp.current_conf.generation == 5


# In this test we check for excessive START_REPLICATION and START_WAL_PUSH queries
# when compute is active, but there are no writes to the timeline. In that case
# pageserver should maintain a single connection to safekeeper and don't attempt
Expand Down
Loading