Skip to content

Commit

Permalink
[Consensus] fix amnesia recovery boot run (#19774)
Browse files Browse the repository at this point in the history
## Description 

Currently if someone recovers their node within epoch `R` with a
snapshot that's from epoch `< R-1` , consensus will not start in amnesia
recovery mode in epoch R as the boot counter will have already been
incremented as node is trying to catch up from earlier epochs. This is
problematic as it defies the whole point of the automatic amnesia
recovery.

Instead on this PR we track consensus participation activity from
earlier epochs/run and only then we increment the boot counter.
Otherwise we keep the boot counter to `0` so we effectively enforce
amnesia recovery until we get to an epoch where the node is able to
participate. In this case participate means "able to have performed at
least one commit".

## Test plan 

CI/PT

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
akichidis authored Oct 16, 2024
1 parent 62f72cd commit b78eb10
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 11 deletions.
2 changes: 1 addition & 1 deletion consensus/core/src/commit_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl CommitConsumerMonitor {
}
}

pub(crate) fn highest_handled_commit(&self) -> CommitIndex {
pub fn highest_handled_commit(&self) -> CommitIndex {
*self.highest_handled_commit.borrow()
}

Expand Down
36 changes: 29 additions & 7 deletions crates/sui-core/src/consensus_manager/mysticeti_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair};
use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority};
use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority};
use fastcrypto::ed25519;
use mysten_metrics::{RegistryID, RegistryService};
use prometheus::Registry;
Expand Down Expand Up @@ -48,6 +48,7 @@ pub struct MysticetiManager {
client: Arc<LazyMysticetiClient>,
// TODO: switch to parking_lot::Mutex.
consensus_handler: Mutex<Option<MysticetiConsensusHandler>>,
consumer_monitor: ArcSwapOption<CommitConsumerMonitor>,
}

impl MysticetiManager {
Expand All @@ -72,6 +73,7 @@ impl MysticetiManager {
client,
consensus_handler: Mutex::new(None),
boot_counter: Mutex::new(0),
consumer_monitor: ArcSwapOption::empty(),
}
}

Expand Down Expand Up @@ -151,7 +153,30 @@ impl ConsensusManagerTrait for MysticetiManager {
CommitConsumer::new(consensus_handler.last_processed_subdag_index() as CommitIndex);
let monitor = commit_consumer.monitor();

let boot_counter = *self.boot_counter.lock().await;
// If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its
// own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this.
// If indeed any commits did happen, then we assume that node did participate on previous run.
let participated_on_previous_run =
if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) {
previous_monitor.highest_handled_commit() > 0
} else {
false
};

// Increment the boot counter only if the consensus successfully participated in the previous run.
// This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start.
// If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch.
// In this scenario, we do not increment the boot counter, as we need amnesia recovery to run.
let mut boot_counter = self.boot_counter.lock().await;
if participated_on_previous_run {
*boot_counter += 1;
} else {
info!(
"Node has not participated in previous run. Boot counter will not increment {}",
*boot_counter
);
}

let authority = ConsensusAuthority::start(
network_type,
own_index,
Expand All @@ -163,15 +188,11 @@ impl ConsensusManagerTrait for MysticetiManager {
Arc::new(tx_validator.clone()),
commit_consumer,
registry.clone(),
boot_counter,
*boot_counter,
)
.await;
let client = authority.transaction_client();

// Now increment the boot counter
let mut boot_counter = self.boot_counter.lock().await;
*boot_counter += 1;

let registry_id = self.registry_service.add(registry.clone());

let registered_authority = Arc::new((authority, registry_id));
Expand All @@ -193,6 +214,7 @@ impl ConsensusManagerTrait for MysticetiManager {
transaction_receiver,
monitor,
);

let mut consensus_handler = self.consensus_handler.lock().await;
*consensus_handler = Some(handler);

Expand Down
20 changes: 17 additions & 3 deletions crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,30 @@ async fn test_mysticeti_manager() {
// THEN
assert!(manager.is_running().await);

let boot_counter = *manager.boot_counter.lock().await;
if i == 1 || i == 2 {
assert_eq!(boot_counter, 0);
} else {
assert_eq!(boot_counter, 1);
}

// Now try to shut it down
sleep(Duration::from_secs(1)).await;

// Simulate a commit by bumping the handled commit index so we can ensure that boot counter increments only after the first run.
// Practically we want to simulate a case where consensus engine restarts when no commits have happened before for first run.
if i > 1 {
let monitor = manager
.consumer_monitor
.load_full()
.expect("A consumer monitor should have been initialised");
monitor.set_highest_handled_commit(100);
}

// WHEN
manager.shutdown().await;

// THEN
assert!(!manager.is_running().await);

let boot_counter = *manager.boot_counter.lock().await;
assert_eq!(boot_counter, i);
}
}

0 comments on commit b78eb10

Please sign in to comment.