diff --git a/consensus/config/src/parameters.rs b/consensus/config/src/parameters.rs index d7c7098f319b0..03c2185468f52 100644 --- a/consensus/config/src/parameters.rs +++ b/consensus/config/src/parameters.rs @@ -10,18 +10,27 @@ use serde::{Deserialize, Serialize}; /// All fields should tolerate inconsistencies among authorities, without affecting safety of the /// protocol. Otherwise, they need to be part of Sui protocol config or epoch state on-chain. /// -/// NOTE: default values should make sense, so most operators should not need to specify any field. +/// NOTE: fields with default values are specified in the serde default functions. Most operators +/// should not need to specify any field, except db_path. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Parameters { /// Time to wait for parent round leader before sealing a block. #[serde(default = "Parameters::default_leader_timeout")] pub leader_timeout: Duration, + /// Minimum delay between rounds, to avoid generating too many rounds when latency is low. + /// This is especially necessary for tests running locally. + /// If setting a non-default value, it should be set low enough to avoid reducing + /// round rate and increasing latency in realistic and distributed configurations. + #[serde(default = "Parameters::default_min_round_delay")] + pub min_round_delay: Duration, + /// Maximum forward time drift (how far in future) allowed for received blocks. #[serde(default = "Parameters::default_max_forward_time_drift")] pub max_forward_time_drift: Duration, - /// The database path. The path should be provided in order for the node to be able to boot + /// The database path. + /// Required. pub db_path: Option, } @@ -30,6 +39,10 @@ impl Parameters { Duration::from_millis(250) } + pub fn default_min_round_delay() -> Duration { + Duration::from_millis(50) + } + pub fn default_max_forward_time_drift() -> Duration { Duration::from_millis(500) } @@ -49,6 +62,7 @@ impl Default for Parameters { fn default() -> Self { Self { leader_timeout: Parameters::default_leader_timeout(), + min_round_delay: Parameters::default_min_round_delay(), max_forward_time_drift: Parameters::default_max_forward_time_drift(), db_path: None, } diff --git a/consensus/config/tests/snapshots/parameters_test__parameters.snap b/consensus/config/tests/snapshots/parameters_test__parameters.snap index efd2e2f911b79..c3649bc025cbc 100644 --- a/consensus/config/tests/snapshots/parameters_test__parameters.snap +++ b/consensus/config/tests/snapshots/parameters_test__parameters.snap @@ -5,6 +5,9 @@ expression: parameters leader_timeout: secs: 0 nanos: 250000000 +min_round_delay: + secs: 0 + nanos: 50000000 max_forward_time_drift: secs: 0 nanos: 500000000 diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index c58d283648b48..baffa464da913 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -4,6 +4,7 @@ use std::{ collections::{BTreeMap, BTreeSet, HashSet}, sync::Arc, + time::Duration, }; use consensus_config::{AuthorityIndex, ProtocolKeyPair}; @@ -221,12 +222,11 @@ impl Core { Ok(None) } - // Attempts to create a new block, persist and broadcast it to all peers. - fn try_propose( - &mut self, - ignore_leaders_check: bool, - ) -> ConsensusResult> { - if let Some(block) = self.try_new_block(ignore_leaders_check) { + // Attempts to create a new block, persist and propose it to all peers. + // When force is true, ignore if leader from the last round exists among ancestors and if + // the minimum round delay has passed. + fn try_propose(&mut self, force: bool) -> ConsensusResult> { + if let Some(block) = self.try_new_block(force) { // When there is only one authority in committee, it is unnecessary to broadcast // the block which will fail anyway without subscribers to the signal. if self.context.committee.size() > 1 { @@ -241,16 +241,27 @@ impl Core { /// Attempts to propose a new block for the next round. If a block has already proposed for latest /// or earlier round, then no block is created and None is returned. - fn try_new_block(&mut self, ignore_leaders_check: bool) -> Option { + fn try_new_block(&mut self, force: bool) -> Option { let _scope = monitored_scope("Core::try_new_block"); + let clock_round = self.threshold_clock.get_round(); if clock_round <= self.last_proposed_round() { return None; } + + let now = timestamp_utc_ms(); + // Create a new block either because we want to "forcefully" propose a block due to a leader timeout, - // or because we are actually ready to produce the block (leader exists) - if !(ignore_leaders_check || self.last_quorum_leaders_exist()) { - return None; + // or because we are actually ready to produce the block (leader exists and min delay has passed). + if !force { + if !self.last_quorum_leaders_exist() { + return None; + } + if Duration::from_millis(now.saturating_sub(self.last_proposed_timestamp_ms())) + < self.context.parameters.min_round_delay + { + return None; + } } // TODO: produce the block for the clock_round. As the threshold clock can advance many rounds at once (ex @@ -260,7 +271,6 @@ impl Core { // Probably proposing for all the intermediate rounds might not make much sense. // 1. Consume the ancestors to be included in proposal - let now = timestamp_utc_ms(); let ancestors = self.ancestors_to_propose(clock_round, now); // 2. Consume the next transactions to be included. @@ -415,6 +425,10 @@ impl Core { self.committer.get_leaders(round) } + fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs { + self.last_proposed_block.timestamp_ms() + } + fn last_proposed_round(&self) -> Round { self.last_proposed_block.round() } @@ -491,9 +505,12 @@ impl CoreSignalsReceivers { mod test { use std::{collections::BTreeSet, time::Duration}; - use consensus_config::{local_committee_and_keys, Stake}; + use consensus_config::{local_committee_and_keys, Parameters, Stake}; use sui_protocol_config::ProtocolConfig; - use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver}; + use tokio::{ + sync::mpsc::{unbounded_channel, UnboundedReceiver}, + time::sleep, + }; use super::*; use crate::{ @@ -869,6 +886,9 @@ mod test { // Adding one block now will trigger the creation of new block for round 1 let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build()); expected_ancestors.insert(block_1.reference()); + // Wait for min round delay to allow blocks to be proposed. + sleep(context.parameters.min_round_delay).await; + // add blocks to trigger proposal. _ = core.add_blocks(vec![block_1]); assert_eq!(core.last_proposed_round(), 1); @@ -879,6 +899,9 @@ mod test { // Adding another block now forms a quorum for round 1, so block at round 2 will proposed let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build()); expected_ancestors.insert(block_3.reference()); + // Wait for min round delay to allow blocks to be proposed. + sleep(context.parameters.min_round_delay).await; + // add blocks to trigger proposal. _ = core.add_blocks(vec![block_3]); assert_eq!(core.last_proposed_round(), 2); @@ -900,6 +923,7 @@ mod test { #[tokio::test] async fn test_core_try_new_block_leader_timeout() { telemetry_subscribers::init_for_testing(); + // Create the cores for all authorities let mut all_cores = create_cores(vec![1, 1, 1, 1]); @@ -917,6 +941,17 @@ mod test { for (core, _signal_receivers, _, _) in cores.iter_mut() { core.add_blocks(last_round_blocks.clone()).unwrap(); + // Only when round > 1 and using non-genesis parents. + if let Some(r) = last_round_blocks.first().map(|b| b.round()) { + assert_eq!(round - 1, r); + // Ensure no new block is proposed before the min round delay. + assert_eq!(core.last_proposed_round(), r); + // Force propose new block regardless of min round delay. + core.try_propose(true).unwrap().unwrap_or_else(|| { + panic!("Block should have been proposed for round {}", round) + }); + } + assert_eq!(core.last_proposed_round(), round); this_round_blocks.push(core.last_proposed_block.clone()); @@ -932,7 +967,8 @@ mod test { assert!(core.try_propose(false).unwrap().is_none()); } - // Now try to create the blocks for round 4 via the leader timeout method which should ignore any leader checks + // Now try to create the blocks for round 4 via the leader timeout method which should + // ignore any leader checks or min round delay. for (core, _, _, _) in cores.iter_mut() { assert!(core.force_new_block(4).unwrap().is_some()); assert_eq!(core.last_proposed_round(), 4); @@ -954,6 +990,8 @@ mod test { #[tokio::test] async fn test_core_signals() { telemetry_subscribers::init_for_testing(); + let default_params = Parameters::default(); + // create the cores and their signals for all the authorities let mut cores = create_cores(vec![1, 1, 1, 1]); @@ -963,6 +1001,8 @@ mod test { let mut this_round_blocks = Vec::new(); for (core, signal_receivers, block_receiver, _) in &mut cores { + // Wait for min round delay to allow blocks to be proposed. + sleep(default_params.min_round_delay).await; // add the blocks from last round // this will trigger a block creation for the round and a signal should be emitted core.add_blocks(last_round_blocks.clone()).unwrap(); @@ -1025,6 +1065,8 @@ mod test { #[tokio::test] async fn test_core_compress_proposal_references() { telemetry_subscribers::init_for_testing(); + let default_params = Parameters::default(); + // create the cores and their signals for all the authorities let mut cores = create_cores(vec![1, 1, 1, 1]); @@ -1061,6 +1103,9 @@ mod test { // be applied the we should expect all the previous blocks to be referenced from round 0..=10. However, since compression // is applied only the last round's (10) blocks should be referenced + the authority's block of round 0. let (core, _, _, _) = &mut cores[excluded_authority]; + // Wait for min round delay to allow blocks to be proposed. + sleep(default_params.min_round_delay).await; + // add blocks to trigger proposal. core.add_blocks(all_blocks).unwrap(); // Assert that a block has been created for round 11 and it references to blocks of round 10 for the other peers, and diff --git a/crates/sui-node/src/lib.rs b/crates/sui-node/src/lib.rs index 9ad31ef96db65..2632284439274 100644 --- a/crates/sui-node/src/lib.rs +++ b/crates/sui-node/src/lib.rs @@ -30,6 +30,7 @@ use sui_core::execution_cache::ExecutionCacheMetrics; use sui_core::execution_cache::NotifyReadWrapper; use sui_json_rpc_api::JsonRpcMetrics; use sui_network::randomness; +use sui_protocol_config::ProtocolVersion; use sui_types::base_types::ConciseableName; use sui_types::crypto::RandomnessRound; use sui_types::digests::ChainIdentifier; @@ -1081,28 +1082,32 @@ impl SuiNode { .as_mut() .ok_or_else(|| anyhow!("Validator is missing consensus config"))?; - // TODO (mysticeti): Move this to a protocol config flag. - if let Ok(consensus_choice) = std::env::var("CONSENSUS") { - let consensus_protocol = match consensus_choice.as_str() { - "narwhal" => ConsensusProtocol::Narwhal, - "mysticeti" => ConsensusProtocol::Mysticeti, - "swap_each_epoch" => { - if epoch_store.epoch() % 2 == 0 { - ConsensusProtocol::Narwhal - } else { - ConsensusProtocol::Mysticeti + // Only allow overriding the consensus protocol, if the protocol version supports + // fields needed by Mysticeti. + if epoch_store.protocol_config().version >= ProtocolVersion::new(36) { + if let Ok(consensus_choice) = std::env::var("CONSENSUS") { + let consensus_protocol = match consensus_choice.as_str() { + "narwhal" => ConsensusProtocol::Narwhal, + "mysticeti" => ConsensusProtocol::Mysticeti, + "swap_each_epoch" => { + if epoch_store.epoch() % 2 == 0 { + ConsensusProtocol::Narwhal + } else { + ConsensusProtocol::Mysticeti + } } - } - _ => { - let consensus = consensus_config.protocol.clone(); - warn!("Consensus env var was set to an invalid choice, using default consensus protocol {consensus:?}"); - consensus - } - }; - info!("Constructing consensus protocol {consensus_protocol:?}..."); - consensus_config.protocol = consensus_protocol; + _ => { + let consensus = consensus_config.protocol.clone(); + warn!("Consensus env var was set to an invalid choice, using default consensus protocol {consensus:?}"); + consensus + } + }; + info!("Constructing consensus protocol {consensus_protocol:?}..."); + consensus_config.protocol = consensus_protocol; + } } + // TODO (mysticeti): Move protocol choice to a protocol config flag. let (consensus_adapter, consensus_manager) = match consensus_config.protocol { ConsensusProtocol::Narwhal => { let consensus_adapter = Arc::new(Self::construct_consensus_adapter( diff --git a/crates/sui-rosetta/tests/end_to_end_tests.rs b/crates/sui-rosetta/tests/end_to_end_tests.rs index 948b73f2e0e49..f75dc853f6043 100644 --- a/crates/sui-rosetta/tests/end_to_end_tests.rs +++ b/crates/sui-rosetta/tests/end_to_end_tests.rs @@ -441,7 +441,8 @@ async fn test_pay_sui_multiple_times() { let (rosetta_client, _handle) = start_rosetta_test_server(client.clone()).await; - for _ in 1..100 { + for i in 1..20 { + println!("Iteration: {}", i); let ops = serde_json::from_value(json!( [{ "operation_identifier":{"index":0}, diff --git a/crates/telemetry-subscribers/Cargo.toml b/crates/telemetry-subscribers/Cargo.toml index b25cda9808db5..0f6e9ba7305ff 100644 --- a/crates/telemetry-subscribers/Cargo.toml +++ b/crates/telemetry-subscribers/Cargo.toml @@ -22,7 +22,7 @@ opentelemetry_api = { version = "0.20.0", optional = true } opentelemetry-otlp = { version = "0.13.0", features = ["grpc-tonic"], optional = true } tracing-opentelemetry = { version = "0.21.0", optional = true } opentelemetry-proto = { version = "0.3", optional = true } -tokio.workspace = true +tokio = { workspace = true, features = ["full"] } futures.workspace = true clap.workspace = true bytes.workspace = true diff --git a/crates/telemetry-subscribers/tests/reload.rs b/crates/telemetry-subscribers/tests/reload.rs index 992f723616f14..726bf7f348c75 100644 --- a/crates/telemetry-subscribers/tests/reload.rs +++ b/crates/telemetry-subscribers/tests/reload.rs @@ -33,9 +33,13 @@ fn reload() { if entry.file_name().starts_with(log_file_prefix) { let logs = fs::read_to_string(entry.path()).unwrap(); - assert!(logs.contains("Should be able to see this")); - assert!(!logs.contains("This won't be captured")); - assert!(logs.contains("Now you can see this!")); + assert!( + logs.contains("Should be able to see this"), + "logs: {}", + logs + ); + assert!(!logs.contains("This won't be captured"), "logs: {}", logs); + assert!(logs.contains("Now you can see this!"), "logs: {}", logs); fs::remove_file(entry.path()).unwrap(); return;