diff --git a/consensus/core/src/commit_consumer.rs b/consensus/core/src/commit_consumer.rs index adee1c716def1..4e40e6da4a376 100644 --- a/consensus/core/src/commit_consumer.rs +++ b/consensus/core/src/commit_consumer.rs @@ -72,7 +72,7 @@ impl CommitConsumerMonitor { } } - pub(crate) fn highest_handled_commit(&self) -> CommitIndex { + pub fn highest_handled_commit(&self) -> CommitIndex { *self.highest_handled_commit.borrow() } diff --git a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs index cfee957039a0d..9b71d11b7f473 100644 --- a/crates/sui-core/src/consensus_manager/mysticeti_manager.rs +++ b/crates/sui-core/src/consensus_manager/mysticeti_manager.rs @@ -5,7 +5,7 @@ use std::{path::PathBuf, sync::Arc, time::Duration}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use consensus_config::{Committee, NetworkKeyPair, Parameters, ProtocolKeyPair}; -use consensus_core::{CommitConsumer, CommitIndex, ConsensusAuthority}; +use consensus_core::{CommitConsumer, CommitConsumerMonitor, CommitIndex, ConsensusAuthority}; use fastcrypto::ed25519; use mysten_metrics::{RegistryID, RegistryService}; use prometheus::Registry; @@ -48,6 +48,7 @@ pub struct MysticetiManager { client: Arc, // TODO: switch to parking_lot::Mutex. consensus_handler: Mutex>, + consumer_monitor: ArcSwapOption, } impl MysticetiManager { @@ -72,6 +73,7 @@ impl MysticetiManager { client, consensus_handler: Mutex::new(None), boot_counter: Mutex::new(0), + consumer_monitor: ArcSwapOption::empty(), } } @@ -151,7 +153,30 @@ impl ConsensusManagerTrait for MysticetiManager { CommitConsumer::new(consensus_handler.last_processed_subdag_index() as CommitIndex); let monitor = commit_consumer.monitor(); - let boot_counter = *self.boot_counter.lock().await; + // If there is a previous consumer monitor, it indicates that the consensus engine has been restarted, due to an epoch change. However, that on its + // own doesn't tell us much whether it participated on an active epoch or an old one. We need to check if it has handled any commits to determine this. + // If indeed any commits did happen, then we assume that node did participate on previous run. + let participated_on_previous_run = + if let Some(previous_monitor) = self.consumer_monitor.swap(Some(monitor.clone())) { + previous_monitor.highest_handled_commit() > 0 + } else { + false + }; + + // Increment the boot counter only if the consensus successfully participated in the previous run. + // This is typical during normal epoch changes, where the node restarts as expected, and the boot counter is incremented to prevent amnesia recovery on the next start. + // If the node is recovering from a restore process and catching up across multiple epochs, it won't handle any commits until it reaches the last active epoch. + // In this scenario, we do not increment the boot counter, as we need amnesia recovery to run. + let mut boot_counter = self.boot_counter.lock().await; + if participated_on_previous_run { + *boot_counter += 1; + } else { + info!( + "Node has not participated in previous run. Boot counter will not increment {}", + *boot_counter + ); + } + let authority = ConsensusAuthority::start( network_type, own_index, @@ -163,15 +188,11 @@ impl ConsensusManagerTrait for MysticetiManager { Arc::new(tx_validator.clone()), commit_consumer, registry.clone(), - boot_counter, + *boot_counter, ) .await; let client = authority.transaction_client(); - // Now increment the boot counter - let mut boot_counter = self.boot_counter.lock().await; - *boot_counter += 1; - let registry_id = self.registry_service.add(registry.clone()); let registered_authority = Arc::new((authority, registry_id)); @@ -193,6 +214,7 @@ impl ConsensusManagerTrait for MysticetiManager { transaction_receiver, monitor, ); + let mut consensus_handler = self.consensus_handler.lock().await; *consensus_handler = Some(handler); diff --git a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs index 00e8a77436624..f14a3eb560a0d 100644 --- a/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs +++ b/crates/sui-core/src/unit_tests/mysticeti_manager_tests.rs @@ -107,16 +107,30 @@ async fn test_mysticeti_manager() { // THEN assert!(manager.is_running().await); + let boot_counter = *manager.boot_counter.lock().await; + if i == 1 || i == 2 { + assert_eq!(boot_counter, 0); + } else { + assert_eq!(boot_counter, 1); + } + // Now try to shut it down sleep(Duration::from_secs(1)).await; + // Simulate a commit by bumping the handled commit index so we can ensure that boot counter increments only after the first run. + // Practically we want to simulate a case where consensus engine restarts when no commits have happened before for first run. + if i > 1 { + let monitor = manager + .consumer_monitor + .load_full() + .expect("A consumer monitor should have been initialised"); + monitor.set_highest_handled_commit(100); + } + // WHEN manager.shutdown().await; // THEN assert!(!manager.is_running().await); - - let boot_counter = *manager.boot_counter.lock().await; - assert_eq!(boot_counter, i); } }