Skip to content

Commit

Permalink
[Consensus] fix additional test failures for Sui with Mysticeti (#16636)
Browse files Browse the repository at this point in the history
## Description 

- Fix the test timeout by adding a minimum delay between rounds, and
reduce workload in some tests. Otherwise with small p2p latencies in
tests, consensus has high round rate, uses significant CPU and slows
down tests.
- Fix `test_coin_deny_list_creation` by only allow overriding consensus
protocol when protocol config is >= 36. But in future Mysticeti will
need to be able to run at earlier protocol versions, because Narwhal may
get removed completely. An alternative is to remove these tests for
specific earlier protocol versions.

This recreates #16614 because somehow the PR is no longer updated with
new commits in the branch.

## Test Plan 

CI
Local tests with `CONSENSUS=mysticeti`

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Mar 13, 2024
1 parent 79b1af5 commit 473e4e2
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 40 deletions.
18 changes: 16 additions & 2 deletions consensus/config/src/parameters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,27 @@ use serde::{Deserialize, Serialize};
/// All fields should tolerate inconsistencies among authorities, without affecting safety of the
/// protocol. Otherwise, they need to be part of Sui protocol config or epoch state on-chain.
///
/// NOTE: default values should make sense, so most operators should not need to specify any field.
/// NOTE: fields with default values are specified in the serde default functions. Most operators
/// should not need to specify any field, except db_path.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Parameters {
/// Time to wait for parent round leader before sealing a block.
#[serde(default = "Parameters::default_leader_timeout")]
pub leader_timeout: Duration,

/// Minimum delay between rounds, to avoid generating too many rounds when latency is low.
/// This is especially necessary for tests running locally.
/// If setting a non-default value, it should be set low enough to avoid reducing
/// round rate and increasing latency in realistic and distributed configurations.
#[serde(default = "Parameters::default_min_round_delay")]
pub min_round_delay: Duration,

/// Maximum forward time drift (how far in future) allowed for received blocks.
#[serde(default = "Parameters::default_max_forward_time_drift")]
pub max_forward_time_drift: Duration,

/// The database path. The path should be provided in order for the node to be able to boot
/// The database path.
/// Required.
pub db_path: Option<PathBuf>,
}

Expand All @@ -30,6 +39,10 @@ impl Parameters {
Duration::from_millis(250)
}

pub fn default_min_round_delay() -> Duration {
Duration::from_millis(50)
}

pub fn default_max_forward_time_drift() -> Duration {
Duration::from_millis(500)
}
Expand All @@ -49,6 +62,7 @@ impl Default for Parameters {
fn default() -> Self {
Self {
leader_timeout: Parameters::default_leader_timeout(),
min_round_delay: Parameters::default_min_round_delay(),
max_forward_time_drift: Parameters::default_max_forward_time_drift(),
db_path: None,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ expression: parameters
leader_timeout:
secs: 0
nanos: 250000000
min_round_delay:
secs: 0
nanos: 50000000
max_forward_time_drift:
secs: 0
nanos: 500000000
Expand Down
73 changes: 59 additions & 14 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::{
collections::{BTreeMap, BTreeSet, HashSet},
sync::Arc,
time::Duration,
};

use consensus_config::{AuthorityIndex, ProtocolKeyPair};
Expand Down Expand Up @@ -221,12 +222,11 @@ impl Core {
Ok(None)
}

// Attempts to create a new block, persist and broadcast it to all peers.
fn try_propose(
&mut self,
ignore_leaders_check: bool,
) -> ConsensusResult<Option<VerifiedBlock>> {
if let Some(block) = self.try_new_block(ignore_leaders_check) {
// Attempts to create a new block, persist and propose it to all peers.
// When force is true, ignore if leader from the last round exists among ancestors and if
// the minimum round delay has passed.
fn try_propose(&mut self, force: bool) -> ConsensusResult<Option<VerifiedBlock>> {
if let Some(block) = self.try_new_block(force) {
// When there is only one authority in committee, it is unnecessary to broadcast
// the block which will fail anyway without subscribers to the signal.
if self.context.committee.size() > 1 {
Expand All @@ -241,16 +241,27 @@ impl Core {

/// Attempts to propose a new block for the next round. If a block has already proposed for latest
/// or earlier round, then no block is created and None is returned.
fn try_new_block(&mut self, ignore_leaders_check: bool) -> Option<VerifiedBlock> {
fn try_new_block(&mut self, force: bool) -> Option<VerifiedBlock> {
let _scope = monitored_scope("Core::try_new_block");

let clock_round = self.threshold_clock.get_round();
if clock_round <= self.last_proposed_round() {
return None;
}

let now = timestamp_utc_ms();

// Create a new block either because we want to "forcefully" propose a block due to a leader timeout,
// or because we are actually ready to produce the block (leader exists)
if !(ignore_leaders_check || self.last_quorum_leaders_exist()) {
return None;
// or because we are actually ready to produce the block (leader exists and min delay has passed).
if !force {
if !self.last_quorum_leaders_exist() {
return None;
}
if Duration::from_millis(now.saturating_sub(self.last_proposed_timestamp_ms()))
< self.context.parameters.min_round_delay
{
return None;
}
}

// TODO: produce the block for the clock_round. As the threshold clock can advance many rounds at once (ex
Expand All @@ -260,7 +271,6 @@ impl Core {
// Probably proposing for all the intermediate rounds might not make much sense.

// 1. Consume the ancestors to be included in proposal
let now = timestamp_utc_ms();
let ancestors = self.ancestors_to_propose(clock_round, now);

// 2. Consume the next transactions to be included.
Expand Down Expand Up @@ -415,6 +425,10 @@ impl Core {
self.committer.get_leaders(round)
}

fn last_proposed_timestamp_ms(&self) -> BlockTimestampMs {
self.last_proposed_block.timestamp_ms()
}

fn last_proposed_round(&self) -> Round {
self.last_proposed_block.round()
}
Expand Down Expand Up @@ -491,9 +505,12 @@ impl CoreSignalsReceivers {
mod test {
use std::{collections::BTreeSet, time::Duration};

use consensus_config::{local_committee_and_keys, Stake};
use consensus_config::{local_committee_and_keys, Parameters, Stake};
use sui_protocol_config::ProtocolConfig;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedReceiver},
time::sleep,
};

use super::*;
use crate::{
Expand Down Expand Up @@ -869,6 +886,9 @@ mod test {
// Adding one block now will trigger the creation of new block for round 1
let block_1 = VerifiedBlock::new_for_test(TestBlock::new(1, 1).build());
expected_ancestors.insert(block_1.reference());
// Wait for min round delay to allow blocks to be proposed.
sleep(context.parameters.min_round_delay).await;
// add blocks to trigger proposal.
_ = core.add_blocks(vec![block_1]);

assert_eq!(core.last_proposed_round(), 1);
Expand All @@ -879,6 +899,9 @@ mod test {
// Adding another block now forms a quorum for round 1, so block at round 2 will proposed
let block_3 = VerifiedBlock::new_for_test(TestBlock::new(1, 2).build());
expected_ancestors.insert(block_3.reference());
// Wait for min round delay to allow blocks to be proposed.
sleep(context.parameters.min_round_delay).await;
// add blocks to trigger proposal.
_ = core.add_blocks(vec![block_3]);

assert_eq!(core.last_proposed_round(), 2);
Expand All @@ -900,6 +923,7 @@ mod test {
#[tokio::test]
async fn test_core_try_new_block_leader_timeout() {
telemetry_subscribers::init_for_testing();

// Create the cores for all authorities
let mut all_cores = create_cores(vec![1, 1, 1, 1]);

Expand All @@ -917,6 +941,17 @@ mod test {
for (core, _signal_receivers, _, _) in cores.iter_mut() {
core.add_blocks(last_round_blocks.clone()).unwrap();

// Only when round > 1 and using non-genesis parents.
if let Some(r) = last_round_blocks.first().map(|b| b.round()) {
assert_eq!(round - 1, r);
// Ensure no new block is proposed before the min round delay.
assert_eq!(core.last_proposed_round(), r);
// Force propose new block regardless of min round delay.
core.try_propose(true).unwrap().unwrap_or_else(|| {
panic!("Block should have been proposed for round {}", round)
});
}

assert_eq!(core.last_proposed_round(), round);

this_round_blocks.push(core.last_proposed_block.clone());
Expand All @@ -932,7 +967,8 @@ mod test {
assert!(core.try_propose(false).unwrap().is_none());
}

// Now try to create the blocks for round 4 via the leader timeout method which should ignore any leader checks
// Now try to create the blocks for round 4 via the leader timeout method which should
// ignore any leader checks or min round delay.
for (core, _, _, _) in cores.iter_mut() {
assert!(core.force_new_block(4).unwrap().is_some());
assert_eq!(core.last_proposed_round(), 4);
Expand All @@ -954,6 +990,8 @@ mod test {
#[tokio::test]
async fn test_core_signals() {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();

// create the cores and their signals for all the authorities
let mut cores = create_cores(vec![1, 1, 1, 1]);

Expand All @@ -963,6 +1001,8 @@ mod test {
let mut this_round_blocks = Vec::new();

for (core, signal_receivers, block_receiver, _) in &mut cores {
// Wait for min round delay to allow blocks to be proposed.
sleep(default_params.min_round_delay).await;
// add the blocks from last round
// this will trigger a block creation for the round and a signal should be emitted
core.add_blocks(last_round_blocks.clone()).unwrap();
Expand Down Expand Up @@ -1025,6 +1065,8 @@ mod test {
#[tokio::test]
async fn test_core_compress_proposal_references() {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();

// create the cores and their signals for all the authorities
let mut cores = create_cores(vec![1, 1, 1, 1]);

Expand Down Expand Up @@ -1061,6 +1103,9 @@ mod test {
// be applied the we should expect all the previous blocks to be referenced from round 0..=10. However, since compression
// is applied only the last round's (10) blocks should be referenced + the authority's block of round 0.
let (core, _, _, _) = &mut cores[excluded_authority];
// Wait for min round delay to allow blocks to be proposed.
sleep(default_params.min_round_delay).await;
// add blocks to trigger proposal.
core.add_blocks(all_blocks).unwrap();

// Assert that a block has been created for round 11 and it references to blocks of round 10 for the other peers, and
Expand Down
43 changes: 24 additions & 19 deletions crates/sui-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use sui_core::execution_cache::ExecutionCacheMetrics;
use sui_core::execution_cache::NotifyReadWrapper;
use sui_json_rpc_api::JsonRpcMetrics;
use sui_network::randomness;
use sui_protocol_config::ProtocolVersion;
use sui_types::base_types::ConciseableName;
use sui_types::crypto::RandomnessRound;
use sui_types::digests::ChainIdentifier;
Expand Down Expand Up @@ -1081,28 +1082,32 @@ impl SuiNode {
.as_mut()
.ok_or_else(|| anyhow!("Validator is missing consensus config"))?;

// TODO (mysticeti): Move this to a protocol config flag.
if let Ok(consensus_choice) = std::env::var("CONSENSUS") {
let consensus_protocol = match consensus_choice.as_str() {
"narwhal" => ConsensusProtocol::Narwhal,
"mysticeti" => ConsensusProtocol::Mysticeti,
"swap_each_epoch" => {
if epoch_store.epoch() % 2 == 0 {
ConsensusProtocol::Narwhal
} else {
ConsensusProtocol::Mysticeti
// Only allow overriding the consensus protocol, if the protocol version supports
// fields needed by Mysticeti.
if epoch_store.protocol_config().version >= ProtocolVersion::new(36) {
if let Ok(consensus_choice) = std::env::var("CONSENSUS") {
let consensus_protocol = match consensus_choice.as_str() {
"narwhal" => ConsensusProtocol::Narwhal,
"mysticeti" => ConsensusProtocol::Mysticeti,
"swap_each_epoch" => {
if epoch_store.epoch() % 2 == 0 {
ConsensusProtocol::Narwhal
} else {
ConsensusProtocol::Mysticeti
}
}
}
_ => {
let consensus = consensus_config.protocol.clone();
warn!("Consensus env var was set to an invalid choice, using default consensus protocol {consensus:?}");
consensus
}
};
info!("Constructing consensus protocol {consensus_protocol:?}...");
consensus_config.protocol = consensus_protocol;
_ => {
let consensus = consensus_config.protocol.clone();
warn!("Consensus env var was set to an invalid choice, using default consensus protocol {consensus:?}");
consensus
}
};
info!("Constructing consensus protocol {consensus_protocol:?}...");
consensus_config.protocol = consensus_protocol;
}
}

// TODO (mysticeti): Move protocol choice to a protocol config flag.
let (consensus_adapter, consensus_manager) = match consensus_config.protocol {
ConsensusProtocol::Narwhal => {
let consensus_adapter = Arc::new(Self::construct_consensus_adapter(
Expand Down
3 changes: 2 additions & 1 deletion crates/sui-rosetta/tests/end_to_end_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,8 @@ async fn test_pay_sui_multiple_times() {

let (rosetta_client, _handle) = start_rosetta_test_server(client.clone()).await;

for _ in 1..100 {
for i in 1..20 {
println!("Iteration: {}", i);
let ops = serde_json::from_value(json!(
[{
"operation_identifier":{"index":0},
Expand Down
2 changes: 1 addition & 1 deletion crates/telemetry-subscribers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ opentelemetry_api = { version = "0.20.0", optional = true }
opentelemetry-otlp = { version = "0.13.0", features = ["grpc-tonic"], optional = true }
tracing-opentelemetry = { version = "0.21.0", optional = true }
opentelemetry-proto = { version = "0.3", optional = true }
tokio.workspace = true
tokio = { workspace = true, features = ["full"] }
futures.workspace = true
clap.workspace = true
bytes.workspace = true
Expand Down
10 changes: 7 additions & 3 deletions crates/telemetry-subscribers/tests/reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ fn reload() {
if entry.file_name().starts_with(log_file_prefix) {
let logs = fs::read_to_string(entry.path()).unwrap();

assert!(logs.contains("Should be able to see this"));
assert!(!logs.contains("This won't be captured"));
assert!(logs.contains("Now you can see this!"));
assert!(
logs.contains("Should be able to see this"),
"logs: {}",
logs
);
assert!(!logs.contains("This won't be captured"), "logs: {}", logs);
assert!(logs.contains("Now you can see this!"), "logs: {}", logs);

fs::remove_file(entry.path()).unwrap();
return;
Expand Down

0 comments on commit 473e4e2

Please sign in to comment.