Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage_controller: fix node flap detach race #10298

Merged
merged 3 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions storage_controller/src/reconciler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken;
use utils::backoff::exponential_backoff;
use utils::failpoint_support;
use utils::generation::Generation;
use utils::id::{NodeId, TimelineId};
use utils::lsn::Lsn;
Expand Down Expand Up @@ -212,11 +211,12 @@ impl Reconciler {
lazy: bool,
) -> Result<(), ReconcileError> {
if !node.is_available() && config.mode == LocationConfigMode::Detached {
// Attempts to detach from offline nodes may be imitated without doing I/O: a node which is offline
// will get fully reconciled wrt the shard's intent state when it is reactivated, irrespective of
// what we put into `observed`, in [`crate::service::Service::node_activate_reconcile`]
tracing::info!("Node {node} is unavailable during detach: proceeding anyway, it will be detached on next activation");
self.observed.locations.remove(&node.get_id());
// [`crate::service::Service::node_activate_reconcile`] will update the observed state
// when the node comes back online. At that point, the intent and observed states will
// be mismatched and a background reconciliation will detach.
tracing::info!(
"Node {node} is unavailable during detach: proceeding anyway, it will be detached via background reconciliation"
);
return Ok(());
}

Expand Down Expand Up @@ -749,6 +749,8 @@ impl Reconciler {
};

if increment_generation {
pausable_failpoint!("reconciler-pre-increment-generation");
VladLazar marked this conversation as resolved.
Show resolved Hide resolved

let generation = self
.persistence
.increment_generation(self.tenant_shard_id, node.get_id())
Expand Down Expand Up @@ -824,7 +826,7 @@ impl Reconciler {
.handle_detach(self.tenant_shard_id, self.shard.stripe_size);
}

failpoint_support::sleep_millis_async!("sleep-on-reconcile-epilogue");
pausable_failpoint!("reconciler-epilogue");

Ok(())
}
Expand Down
4 changes: 4 additions & 0 deletions storage_controller/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ use utils::{
generation::Generation,
http::error::ApiError,
id::{NodeId, TenantId, TimelineId},
pausable_failpoint,
sync::gate::Gate,
};

Expand Down Expand Up @@ -1024,6 +1025,8 @@ impl Service {
)
.await;

pausable_failpoint!("heartbeat-pre-node-state-configure");

// This is the code path for geniune availability transitions (i.e node
// goes unavailable and/or comes back online).
let res = self
Expand Down Expand Up @@ -2492,6 +2495,7 @@ impl Service {
// Persist updates
// Ordering: write to the database before applying changes in-memory, so that
// we will not appear time-travel backwards on a restart.

let mut schedule_context = ScheduleContext::default();
for ShardUpdate {
tenant_shard_id,
Expand Down
7 changes: 5 additions & 2 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2521,6 +2521,7 @@ def start(
self,
extra_env_vars: dict[str, str] | None = None,
timeout_in_seconds: int | None = None,
await_active: bool = True,
) -> Self:
"""
Start the page server.
Expand All @@ -2547,8 +2548,10 @@ def start(
)
self.running = True

if self.env.storage_controller.running and self.env.storage_controller.node_registered(
self.id
if (
await_active
and self.env.storage_controller.running
and self.env.storage_controller.node_registered(self.id)
):
self.env.storage_controller.poll_node_status(
self.id, PageserverAvailability.ACTIVE, None, max_attempts=200, backoff=0.1
Expand Down
122 changes: 121 additions & 1 deletion test_runner/regress/test_storage_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from fixtures.log_helper import log
from fixtures.neon_fixtures import (
DEFAULT_AZ_ID,
LogCursor,
NeonEnv,
NeonEnvBuilder,
NeonPageserver,
Expand Down Expand Up @@ -2406,7 +2407,14 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
env.storage_controller.tenant_create(tid)

env.storage_controller.reconcile_until_idle()
env.storage_controller.configure_failpoints(("sleep-on-reconcile-epilogue", "return(10000)"))
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))

def unpause_failpoint():
time.sleep(2)
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))

thread = threading.Thread(target=unpause_failpoint)
thread.start()

# Make a change to the tenant config to trigger a slow reconcile
virtual_ps_http = PageserverHttpClient(env.storage_controller_port, lambda: True)
Expand All @@ -2421,6 +2429,8 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder):
observed_state = env.storage_controller.step_down()
log.info(f"Storage controller stepped down with {observed_state=}")

thread.join()

# Validate that we waited for the slow reconcile to complete
# and updated the observed state in the storcon before stepping down.
node_id = str(env.pageserver.id)
Expand Down Expand Up @@ -3294,3 +3304,113 @@ def test_storage_controller_detached_stopped(

# Confirm the detach happened
assert env.pageserver.http_client().tenant_list_locations()["tenant_shards"] == []


@run_only_on_default_postgres("Postgres version makes no difference here")
def test_storage_controller_node_flap_detach_race(
neon_env_builder: NeonEnvBuilder,
):
"""
Reproducer for https://github.com/neondatabase/neon/issues/10253.

When a node's availability flaps, the reconciliations spawned by the node
going offline may race with the reconciliation done when then node comes
back online.
"""
neon_env_builder.num_pageservers = 4

env = neon_env_builder.init_configs()
env.start()

tenant_id = TenantId.generate()
env.storage_controller.tenant_create(
tenant_id,
shard_count=2,
)
env.storage_controller.reconcile_until_idle()

stopped_nodes = [s["node_id"] for s in env.storage_controller.locate(tenant_id)]

def has_hit_failpoint(failpoint: str, offset: LogCursor | None = None) -> LogCursor:
res = env.storage_controller.log_contains(f"at failpoint {failpoint}", offset=offset)
assert res
return res[1]

# Stop the nodes which host attached shards.
# This will trigger reconciliations which pause before incrmenenting the generation,
# and, more importantly, updating the `generation_pageserver` of the shards.
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "pause"))
for node_id in stopped_nodes:
env.get_pageserver(node_id).stop(immediate=True)

def failure_handled() -> LogCursor:
stop_offset = None

for node_id in stopped_nodes:
res = env.storage_controller.log_contains(f"node {node_id} going offline")
assert res
stop_offset = res[1]

assert stop_offset
return stop_offset

offset = wait_until(failure_handled)

# Now restart the nodes and make them pause before marking themselves as available
# or running the activation reconciliation.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "pause"))

for node_id in stopped_nodes:
env.get_pageserver(node_id).start(await_active=False)

offset = wait_until(
lambda: has_hit_failpoint("heartbeat-pre-node-state-configure", offset=offset)
)

# The nodes have restarted and are waiting to perform activaction reconciliation.
# Unpause the initial reconciliation triggered by the nodes going offline.
# It will attempt to detach from the old location, but notice that the old location
# is not yet available, and then stop before processing the results of the reconciliation.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "pause"))
env.storage_controller.configure_failpoints(("reconciler-pre-increment-generation", "off"))

offset = wait_until(lambda: has_hit_failpoint("reconciler-epilogue", offset=offset))

# Let the nodes perform activation reconciliation while still holding up processing the result
# from the initial reconcile triggered by going offline.
env.storage_controller.configure_failpoints(("heartbeat-pre-node-state-configure", "off"))

def activate_reconciliation_done():
for node_id in stopped_nodes:
assert env.storage_controller.log_contains(
f"Node {node_id} transition to active", offset=offset
)

wait_until(activate_reconciliation_done)

# Finally, allow the initial reconcile to finish up.
env.storage_controller.configure_failpoints(("reconciler-epilogue", "off"))

# Give things a chance to settle and validate that no stale locations exist
env.storage_controller.reconcile_until_idle()

def validate_locations():
shard_locations = defaultdict(list)
for ps in env.pageservers:
locations = ps.http_client().tenant_list_locations()["tenant_shards"]
for loc in locations:
shard_locations[loc[0]].append(
{"generation": loc[1]["generation"], "mode": loc[1]["mode"], "node": ps.id}
)

log.info(f"Shard locations: {shard_locations}")

attached_locations = {
k: list(filter(lambda loc: loc["mode"] == "AttachedSingle", v))
for k, v in shard_locations.items()
}

for shard, locs in attached_locations.items():
assert len(locs) == 1, f"{shard} has {len(locs)} attached locations"

wait_until(validate_locations, timeout=10)
Loading