diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index dfd1abed938b0..9fcd334059a1c 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -34,9 +34,6 @@ use crate::{ }, }; -// TODO: Move to protocol config once initial value is finalized. -const NUM_LEADERS_PER_ROUND: usize = 1; - // Maximum number of commit votes to include in a block. // TODO: Move to protocol config, and verify in BlockVerifier. const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100; @@ -92,12 +89,16 @@ impl Core { dag_state: Arc>, ) -> Self { let last_decided_leader = dag_state.read().last_commit_leader(); + let number_of_leaders = context + .protocol_config + .mysticeti_num_leaders_per_round() + .unwrap_or(1); let committer = UniversalCommitterBuilder::new( context.clone(), leader_schedule.clone(), dag_state.clone(), ) - .with_number_of_leaders(NUM_LEADERS_PER_ROUND) + .with_number_of_leaders(number_of_leaders) .with_pipeline(true) .build(); @@ -429,7 +430,7 @@ impl Core { self.last_proposed_block = verified_block.clone(); // Now acknowledge the transactions for their inclusion to block - ack_transactions(); + ack_transactions(verified_block.reference()); info!("Created block {:?}", verified_block); @@ -1542,6 +1543,133 @@ mod test { } } + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_commit_on_leader_schedule_change_boundary_without_multileader() { + parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await; + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_commit_on_leader_schedule_change_boundary_with_multileader() { + parameterized_test_commit_on_leader_schedule_change_boundary(None).await; + } + + async fn parameterized_test_commit_on_leader_schedule_change_boundary( + num_leaders_per_round: Option, + ) { + telemetry_subscribers::init_for_testing(); + let default_params = Parameters::default(); + + let (mut context, _) = Context::new_for_test(6); + context + .protocol_config + .set_mysticeti_num_leaders_per_round(num_leaders_per_round); + // create the cores and their signals for all the authorities + let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]); + + // Now iterate over a few rounds and ensure the corresponding signals are created while network advances + let mut last_round_blocks = Vec::new(); + for round in 1..=63 { + let mut this_round_blocks = Vec::new(); + + // Wait for min round delay to allow blocks to be proposed. + sleep(default_params.min_round_delay).await; + + for (core, signal_receivers, block_receiver, _, _) in &mut cores { + // add the blocks from last round + // this will trigger a block creation for the round and a signal should be emitted + core.add_blocks(last_round_blocks.clone()).unwrap(); + + // A "new round" signal should be received given that all the blocks of previous round have been processed + let new_round = receive( + Duration::from_secs(1), + signal_receivers.new_round_receiver(), + ) + .await; + assert_eq!(new_round, round); + + // Check that a new block has been proposed. + let block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv()) + .await + .unwrap() + .unwrap(); + assert_eq!(block.round(), round); + assert_eq!(block.author(), core.context.own_index); + + // append the new block to this round blocks + this_round_blocks.push(core.last_proposed_block().clone()); + + let block = core.last_proposed_block(); + + // ensure that produced block is referring to the blocks of last_round + assert_eq!(block.ancestors().len(), core.context.committee.size()); + for ancestor in block.ancestors() { + if block.round() > 1 { + // don't bother with round 1 block which just contains the genesis blocks. + assert!( + last_round_blocks + .iter() + .any(|block| block.reference() == *ancestor), + "Reference from previous round should be added" + ); + } + } + } + + last_round_blocks = this_round_blocks; + } + + for (core, _, _, _, store) in cores { + // Check commits have been persisted to store + let last_commit = store + .read_last_commit() + .unwrap() + .expect("last commit should be set"); + // There are 61 leader rounds with rounds completed up to and including + // round 63. Round 63 blocks will only include their own blocks, so there + // should only be 60 commits. + // However on a leader schedule change boundary its is possible for a + // new leader to get selected for the same round if the leader elected + // gets swapped allowing for multiple leaders to be committed at a round. + // Meaning with multi leader per round explicitly set to 1 we will have 60, + // otherwise 61. + // NOTE: We used 61 leader rounds to specifically trigger the scenario + // where the leader schedule boundary occured AND we had a swap to a new + // leader for the same round + let expected_commit_count = match num_leaders_per_round { + Some(1) => 60, + _ => 61, + }; + assert_eq!(last_commit.index(), expected_commit_count); + let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap(); + assert_eq!(all_stored_commits.len(), expected_commit_count as usize); + assert_eq!( + core.leader_schedule + .leader_swap_table + .read() + .bad_nodes + .len(), + 1 + ); + assert_eq!( + core.leader_schedule + .leader_swap_table + .read() + .good_nodes + .len(), + 1 + ); + let expected_reputation_scores = + ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]); + assert_eq!( + core.leader_schedule + .leader_swap_table + .read() + .reputation_scores, + expected_reputation_scores + ); + } + } + #[tokio::test] async fn test_core_signals() { telemetry_subscribers::init_for_testing(); diff --git a/consensus/core/src/transaction.rs b/consensus/core/src/transaction.rs index 02fa036039b51..7347b1b403afd 100644 --- a/consensus/core/src/transaction.rs +++ b/consensus/core/src/transaction.rs @@ -10,8 +10,10 @@ use thiserror::Error; use tokio::sync::oneshot; use tracing::{error, warn}; -use crate::block::Transaction; -use crate::context::Context; +use crate::{ + block::{BlockRef, Transaction}, + context::Context, +}; /// The maximum number of transactions pending to the queue to be pulled for block proposal const MAX_PENDING_TRANSACTIONS: usize = 2_000; @@ -27,7 +29,7 @@ pub(crate) struct TransactionsGuard { // A TransactionsGuard may be partially consumed by `TransactionConsumer`, in which case, this holds the remaining transactions. transactions: Vec, - included_in_block_ack: oneshot::Sender<()>, + included_in_block_ack: oneshot::Sender, } /// The TransactionConsumer is responsible for fetching the next transactions to be included for the block proposals. @@ -62,7 +64,7 @@ impl TransactionConsumer { // This returns one or more transactions to be included in the block and a callback to acknowledge the inclusion of those transactions. // Note that a TransactionsGuard may be partially consumed and the rest saved for the next pull, in which case its `included_in_block_ack` // will not be signalled in the callback. - pub(crate) fn next(&mut self) -> (Vec, Box) { + pub(crate) fn next(&mut self) -> (Vec, Box) { let mut transactions = Vec::new(); let mut acks = Vec::new(); let mut total_size: usize = 0; @@ -119,9 +121,9 @@ impl TransactionConsumer { ( transactions, - Box::new(move || { + Box::new(move |block_ref: BlockRef| { for ack in acks { - let _ = ack.send(()); + let _ = ack.send(block_ref); } }), ) @@ -172,7 +174,8 @@ impl TransactionClient { /// Submits a list of transactions to be sequenced. The method returns when all the transactions have been successfully included /// to next proposed blocks. - pub async fn submit(&self, transactions: Vec>) -> Result<(), ClientError> { + pub async fn submit(&self, transactions: Vec>) -> Result { + // TODO: Support returning the block refs for transactions that span multiple blocks let included_in_block = self.submit_no_wait(transactions).await?; included_in_block .await @@ -190,7 +193,7 @@ impl TransactionClient { pub(crate) async fn submit_no_wait( &self, transactions: Vec>, - ) -> Result, ClientError> { + ) -> Result, ClientError> { let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel(); for transaction in &transactions { if transaction.len() as u64 > self.max_transaction_size { @@ -246,15 +249,18 @@ impl TransactionVerifier for NoopTransactionVerifier { #[cfg(test)] mod tests { - use crate::context::Context; - use crate::transaction::{TransactionClient, TransactionConsumer}; - use futures::stream::FuturesUnordered; - use futures::StreamExt; - use std::sync::Arc; - use std::time::Duration; + use std::{sync::Arc, time::Duration}; + + use futures::{stream::FuturesUnordered, StreamExt}; use sui_protocol_config::ProtocolConfig; use tokio::time::timeout; + use crate::{ + block::BlockRef, + context::Context, + transaction::{TransactionClient, TransactionConsumer}, + }; + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn basic_submit_and_consume() { let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| { @@ -296,7 +302,7 @@ mod tests { ); // Now acknowledge the inclusion of transactions - ack_transactions(); + ack_transactions(BlockRef::MIN); // Now make sure that all the waiters have returned while let Some(result) = included_in_block_waiters.next().await { @@ -428,7 +434,7 @@ mod tests { // now pull the transactions from the consumer. // we expect all transactions are fetched in order, not missing any, and not exceeding the size limit. let mut all_transactions = Vec::new(); - let mut all_acks: Vec> = Vec::new(); + let mut all_acks: Vec> = Vec::new(); while !consumer.is_empty() { let (transactions, ack_transactions) = consumer.next(); @@ -457,7 +463,7 @@ mod tests { // now acknowledge the inclusion of all transactions. for ack in all_acks { - ack(); + ack(BlockRef::MIN); } // expect all receivers to be resolved. diff --git a/consensus/core/src/universal_committer.rs b/consensus/core/src/universal_committer.rs index dc393b31db96e..e78a0a8a915a0 100644 --- a/consensus/core/src/universal_committer.rs +++ b/consensus/core/src/universal_committer.rs @@ -43,11 +43,27 @@ impl UniversalCommitter { // Try to decide as many leaders as possible, starting with the highest round. let mut leaders = VecDeque::new(); + + let last_round = match self + .context + .protocol_config + .mysticeti_num_leaders_per_round() + { + Some(1) => { + // Ensure that we don't commit any leaders from the same round as last_decided + // until we have full support for multi-leader per round. + // This can happen when we are on a leader schedule boundary and the leader + // elected for the round changes with the new schedule. + last_decided.round + 1 + } + _ => last_decided.round, + }; + // try to commit a leader up to the highest_accepted_round - 2. There is no // reason to try and iterate on higher rounds as in order to make a direct // decision for a leader at round R we need blocks from round R+2 to figure // out that enough certificates and support exist to commit a leader. - 'outer: for round in (last_decided.round..=highest_accepted_round.saturating_sub(2)).rev() { + 'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() { for committer in self.committers.iter().rev() { // Skip committers that don't have a leader for this round. let Some(slot) = committer.elect_leader(round) else { diff --git a/crates/sui-core/src/consensus_handler.rs b/crates/sui-core/src/consensus_handler.rs index 14c0bc293aaf7..2f3b131275ca1 100644 --- a/crates/sui-core/src/consensus_handler.rs +++ b/crates/sui-core/src/consensus_handler.rs @@ -233,17 +233,7 @@ impl ConsensusHandler { let round = consensus_output.leader_round(); - assert!(round >= last_committed_round); - if last_committed_round == round { - // we can receive the same commit twice after restart - // It is critical that the writes done by this function are atomic - otherwise we can - // lose the later parts of a commit if we restart midway through processing it. - info!( - "Ignoring consensus output for round {} as it is already committed", - round - ); - return; - } + assert!(round > last_committed_round); /* (serialized, transaction, output_cert) */ let mut transactions = vec![]; @@ -1006,16 +996,6 @@ mod tests { last_consensus_stats_1.stats.get_num_user_transactions(0), num_transactions as u64 ); - - // WHEN processing the same output multiple times - // THEN the consensus stats do not update - for _ in 0..2 { - consensus_handler - .handle_consensus_output(consensus_output.clone()) - .await; - let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone(); - assert_eq!(last_consensus_stats_1, last_consensus_stats_2); - } } #[test] diff --git a/crates/sui-core/src/mysticeti_adapter.rs b/crates/sui-core/src/mysticeti_adapter.rs index 8b75687f6c61b..eda10df0e6b3e 100644 --- a/crates/sui-core/src/mysticeti_adapter.rs +++ b/crates/sui-core/src/mysticeti_adapter.rs @@ -7,7 +7,7 @@ use arc_swap::{ArcSwapOption, Guard}; use consensus_core::TransactionClient; use sui_types::{ error::{SuiError, SuiResult}, - messages_consensus::ConsensusTransaction, + messages_consensus::{ConsensusTransaction, ConsensusTransactionKind}, }; use tap::prelude::*; use tokio::time::{sleep, timeout}; @@ -15,7 +15,7 @@ use tracing::warn; use crate::{ authority::authority_per_epoch_store::AuthorityPerEpochStore, - consensus_adapter::SubmitToConsensus, + consensus_adapter::SubmitToConsensus, consensus_handler::SequencedConsensusTransactionKey, }; /// Basically a wrapper struct that reads from the LOCAL_MYSTICETI_CLIENT variable where the latest @@ -85,7 +85,7 @@ impl SubmitToConsensus for LazyMysticetiClient { .iter() .map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail")) .collect::>(); - client + let block_ref = client .as_ref() .expect("Client should always be returned") .submit(transactions_bytes) @@ -95,6 +95,21 @@ impl SubmitToConsensus for LazyMysticetiClient { warn!("Submit transactions failed with: {:?}", r); }) .map_err(|err| SuiError::FailedToSubmitToConsensus(err.to_string()))?; + + let is_soft_bundle = transactions.len() > 1; + + if !is_soft_bundle + && matches!( + transactions[0].kind, + ConsensusTransactionKind::EndOfPublish(_) + | ConsensusTransactionKind::CapabilityNotification(_) + | ConsensusTransactionKind::RandomnessDkgMessage(_, _) + | ConsensusTransactionKind::RandomnessDkgConfirmation(_, _) + ) + { + let transaction_key = SequencedConsensusTransactionKey::External(transactions[0].key()); + tracing::info!("Transaction {transaction_key:?} was included in {block_ref}",) + }; Ok(()) } } diff --git a/crates/sui-protocol-config/src/lib.rs b/crates/sui-protocol-config/src/lib.rs index 6213fceffca12..276f4974ffdfc 100644 --- a/crates/sui-protocol-config/src/lib.rs +++ b/crates/sui-protocol-config/src/lib.rs @@ -1,13 +1,16 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::{ + cell::RefCell, + collections::BTreeSet, + sync::atomic::{AtomicBool, Ordering}, +}; + use clap::*; use move_vm_config::verifier::{MeterConfig, VerifierConfig}; use serde::{Deserialize, Serialize}; use serde_with::skip_serializing_none; -use std::cell::RefCell; -use std::collections::BTreeSet; -use std::sync::atomic::{AtomicBool, Ordering}; use sui_protocol_config_macros::{ProtocolConfigAccessors, ProtocolConfigFeatureFlagsGetters}; use tracing::{info, warn}; @@ -142,6 +145,7 @@ const MAX_PROTOCOL_VERSION: u64 = 50; // New Move stdlib integer modules // Enable checkpoint batching in testnet. // Prepose consensus commit prologue in checkpoints. +// Set number of leaders per round for Mysticeti commits. #[derive(Copy, Clone, Debug, Hash, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)] pub struct ProtocolVersion(u64); @@ -493,6 +497,10 @@ struct FeatureFlags { // cancellation. #[serde(skip_serializing_if = "is_false")] prepend_prologue_tx_in_consensus_commit_in_checkpoints: bool, + + // Set number of leaders per round for Mysticeti commits. + #[serde(skip_serializing_if = "Option::is_none")] + mysticeti_num_leaders_per_round: Option, } fn is_false(b: &bool) -> bool { @@ -1421,6 +1429,10 @@ impl ProtocolConfig { pub fn fresh_vm_on_framework_upgrade(&self) -> bool { self.feature_flags.fresh_vm_on_framework_upgrade } + + pub fn mysticeti_num_leaders_per_round(&self) -> Option { + self.feature_flags.mysticeti_num_leaders_per_round + } } #[cfg(not(msim))] @@ -2374,6 +2386,8 @@ impl ProtocolConfig { cfg.feature_flags .prepend_prologue_tx_in_consensus_commit_in_checkpoints = true; } + + cfg.feature_flags.mysticeti_num_leaders_per_round = Some(1); } // Use this template when making changes: // @@ -2544,9 +2558,14 @@ impl ProtocolConfig { pub fn set_mysticeti_leader_scoring_and_schedule(&mut self, val: bool) { self.feature_flags.mysticeti_leader_scoring_and_schedule = val; } + pub fn set_min_checkpoint_interval_ms(&mut self, val: u64) { self.min_checkpoint_interval_ms = Some(val); } + + pub fn set_mysticeti_num_leaders_per_round(&mut self, val: Option) { + self.feature_flags.mysticeti_num_leaders_per_round = val; + } } type OverrideFn = dyn Fn(ProtocolVersion, ProtocolConfig) -> ProtocolConfig + Send; @@ -2639,9 +2658,10 @@ macro_rules! check_limit_by_meter { #[cfg(all(test, not(msim)))] mod test { - use super::*; use insta::assert_yaml_snapshot; + use super::*; + #[test] fn snapshot_tests() { println!("\n============================================================================"); diff --git a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Mainnet_version_50.snap b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Mainnet_version_50.snap index 2f72862b8f882..30adcce2961cf 100644 --- a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Mainnet_version_50.snap +++ b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Mainnet_version_50.snap @@ -49,6 +49,7 @@ feature_flags: resolve_abort_locations_to_package_id: true mysticeti_use_committed_subdag_digest: true fresh_vm_on_framework_upgrade: true + mysticeti_num_leaders_per_round: 1 max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000 diff --git a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Testnet_version_50.snap b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Testnet_version_50.snap index ac99742cbd126..1ff804ef9fbf6 100644 --- a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Testnet_version_50.snap +++ b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__Testnet_version_50.snap @@ -53,6 +53,7 @@ feature_flags: resolve_abort_locations_to_package_id: true mysticeti_use_committed_subdag_digest: true fresh_vm_on_framework_upgrade: true + mysticeti_num_leaders_per_round: 1 max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000 diff --git a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_50.snap b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_50.snap index 912771cf0af17..08534483c8035 100644 --- a/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_50.snap +++ b/crates/sui-protocol-config/src/snapshots/sui_protocol_config__test__version_50.snap @@ -58,6 +58,7 @@ feature_flags: record_consensus_determined_version_assignments_in_prologue: true fresh_vm_on_framework_upgrade: true prepend_prologue_tx_in_consensus_commit_in_checkpoints: true + mysticeti_num_leaders_per_round: 1 max_tx_size_bytes: 131072 max_input_objects: 2048 max_size_written_objects: 5000000 diff --git a/scripts/simtest/seed-search.py b/scripts/simtest/seed-search.py index 09da077e80c3f..6466cfac8c7e6 100755 --- a/scripts/simtest/seed-search.py +++ b/scripts/simtest/seed-search.py @@ -12,6 +12,7 @@ parser = argparse.ArgumentParser(description='Run the simulator with different seeds') parser.add_argument('binary', type=str, help='Name of simulator binary, or full path to binary') parser.add_argument('--test', type=str, help='Name of the test to run', required=True) +parser.add_argument('--exact', action='store_true', help='Use exact matching for test name', default=False) parser.add_argument('--num-seeds', type=int, help='Number of seeds to run', default=200) parser.add_argument( '--seed-start', @@ -41,7 +42,7 @@ def run_command(command, env_vars): else: print("-- seed passed %s" % env_vars["MSIM_TEST_SEED"]) - return 0 + return exit_code except subprocess.CalledProcessError as e: print(f"Command '{e.cmd}' failed with exit code {e.returncode} for seed: " + env_vars["MSIM_TEST_SEED"]) return e.returncode @@ -55,13 +56,18 @@ def main(commands): future = executor.submit(run_command, cmd, env_vars) future_to_command[future] = cmd + all_passed = True for future in concurrent.futures.as_completed(future_to_command): cmd = future_to_command[future] exit_code = future.result() if exit_code != 0: + all_passed = False print(f"Command '{cmd}' failed with exit code {exit_code}") sys.exit(1) + if all_passed: + print("\033[92mAll tests passed successfully!\033[0m") + if __name__ == "__main__": repo_root = subprocess.check_output(["git", "rev-parse", "--show-toplevel"]).decode("utf-8").strip() @@ -89,7 +95,7 @@ def main(commands): for i in range(1, args.num_seeds + 1): next_seed = args.seed_start + i - commands.append(("%s --exact %s" % (binary, args.test), { + commands.append(("%s %s %s" % (binary, '--exact' if args.exact else '', args.test), { "MSIM_TEST_SEED": "%d" % next_seed, "RUST_LOG": "off", }))