From b78eb1098ee2fcd08fb1576d049c354dd57fc8ca Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Wed, 16 Oct 2024 20:00:19 +0100 Subject: [PATCH] [Consensus] fix amnesia recovery boot run (#19774) ## Description Currently if someone recovers their node within epoch `R` with a snapshot that's from epoch `< R-1` , consensus will not start in amnesia recovery mode in epoch R as the boot counter will have already been incremented as node is trying to catch up from earlier epochs. This is problematic as it defies the whole point of the automatic amnesia recovery. Instead on this PR we track consensus participation activity from earlier epochs/run and only then we increment the boot counter. Otherwise we keep the boot counter to `0` so we effectively enforce amnesia recovery until we get to an epoch where the node is able to participate. In this case participate means "able to have performed at least one commit". ## Test plan CI/PT --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- consensus/core/src/commit_consumer.rs | 2 +- .../consensus_manager/mysticeti_manager.rs | 36 +++++++++++++++---- .../src/unit_tests/mysticeti_manager_tests.rs | 20 +++++++++-- 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/consensus/core/src/commit_consumer.rs b/consensus/core/src/commit_consumer.rs index adee1c716def1..4e40e6da4a376 100644 --- a/consensus/core/src/commit_consumer.rs +++ b/consensus/core/src/commit_consumer.rs @@ -72,7 +72,7 @@ impl CommitConsumerMonitor { } } - pub(crate) fn highest_handled_commit(&self) -> CommitIndex { + pub fn highest_handled_commit(&self) -> CommitIndex { *self.highest_handled_commit.borrow() } diff --git a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs index cfee957039a0d..9b71d11b7f473 100644 --- a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs +++ b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs @@ -5,7 +5,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair}; -use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority}; +use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority}; use fastcrypto::ed25519; use mysten_metrics::{RegistryID, RegistryService}; use prometheus::Registry; @@ -48,6 +48,7 @@ pub struct MysticetiManager { client: Arc, // TODO: switch to parking_lot::Mutex. consensus_handler: Mutex>, + consumer_monitor: ArcSwapOption, } impl MysticetiManager { @@ -72,6 +73,7 @@ impl MysticetiManager { client, consensus_handler: Mutex::new(None), boot_counter: Mutex::new(0), + consumer_monitor: ArcSwapOption::empty(), } } @@ -151,7 +153,30 @@ impl ConsensusManagerTrait for MysticetiManager { CommitConsumer::new(consensus_handler.last_processed_subdag_index() as CommitIndex); let monitor = commit_consumer.monitor(); - let boot_counter = *self.boot_counter.lock().await; + // If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its + // own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this. + // If indeed any commits did happen, then we assume that node did participate on previous run. + let participated_on_previous_run = + if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) { + previous_monitor.highest_handled_commit() > 0 + } else { + false + }; + + // Increment the boot counter only if the consensus successfully participated in the previous run. + // This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start. + // If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch. + // In this scenario, we do not increment the boot counter, as we need amnesia recovery to run. + let mut boot_counter = self.boot_counter.lock().await; + if participated_on_previous_run { + *boot_counter += 1; + } else { + info!( + "Node has not participated in previous run. Boot counter will not increment {}", + *boot_counter + ); + } + let authority = ConsensusAuthority::start( network_type, own_index, @@ -163,15 +188,11 @@ impl ConsensusManagerTrait for MysticetiManager { Arc::new(tx_validator.clone()), commit_consumer, registry.clone(), - boot_counter, + *boot_counter, ) .await; let client = authority.transaction_client(); - // Now increment the boot counter - let mut boot_counter = self.boot_counter.lock().await; - *boot_counter += 1; - let registry_id = self.registry_service.add(registry.clone()); let registered_authority = Arc::new((authority, registry_id)); @@ -193,6 +214,7 @@ impl ConsensusManagerTrait for MysticetiManager { transaction_receiver, monitor, ); + let mut consensus_handler = self.consensus_handler.lock().await; *consensus_handler = Some(handler); diff --git a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs index 00e8a77436624..f14a3eb560a0d 100644 --- a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs @@ -107,16 +107,30 @@ async fn test_mysticeti_manager() { // THEN assert!(manager.is_running().await); + let boot_counter = *manager.boot_counter.lock().await; + if i == 1 || i == 2 { + assert_eq!(boot_counter, 0); + } else { + assert_eq!(boot_counter, 1); + } + // Now try to shut it down sleep(Duration::from_secs(1)).await; + // Simulate a commit by bumping the handled commit index so we can ensure that boot counter increments only after the first run. + // Practically we want to simulate a case where consensus engine restarts when no commits have happened before for first run. + if i > 1 { + let monitor = manager + .consumer_monitor + .load_full() + .expect("A consumer monitor should have been initialised"); + monitor.set_highest_handled_commit(100); + } + // WHEN manager.shutdown().await; // THEN assert!(!manager.is_running().await); - - let boot_counter = *manager.boot_counter.lock().await; - assert_eq!(boot_counter, i); } }