Skip to content

Commit

Permalink
[consensus] Disable multi leader per round in universal committer (#1…
Browse files Browse the repository at this point in the history
…8206)

## Description 

On the boundaries of Leader Schedule change it is possible to elect a
new leader for the same round resulting in multiple commits for the same
round. This was unintended output from consensus as we have set the
parameters to only commit 1 leader per round. Sui also only expects one
commit per round, because of this the second commit of the same round is
ignored by consensus handler. This led to an error in simtests where the
`EndOfPublish` transaction was part of this ignored commit and we could
not form a quorum to close the epoch without this node.

Changes
- Explicitly set number of leaders per round to 1 for Mysticeti commits.
I wanted to try and maintain this but it would be a much more intrusive
change and we will probably have to do a larger refactor to fully
support it anyways, so better off doing the cleaner mitigation for now.
- Pass back the block that the transaction was included in as part of
the tx acknowledgment and include that in the logging to help with
debugging future issues.
- Also a minor change to print if all tests pass during simtest
seed-search

## Test plan 

```
❯ scripts/simtest/seed-search.py simtest --test test_reconfig_with_committee_change_basic --num-seeds 1000
    Updating git repository `https://github.com/MystenLabs/mysten-sim.git`
   Compiling consensus-core v0.1.0 (/Users/arunkoshy/Documents/GitHub/sui/consensus/core)
   ...
   All tests passed successfully!
  Killing child processes
  [1]    391 killed     scripts/simtest/seed-search.py simtest --test  --num-seeds 1000
```

---

## Release notes

- [X] Protocol: Version 50 - Explicitly set number of leaders per round
to 1 for Mysticeti commits
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:

---------

Co-authored-by: Mark Logan <[email protected]>
  • Loading branch information
arun-koshy and mystenmark authored Jun 13, 2024
1 parent b82e65b commit 81d0217
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 53 deletions.
138 changes: 133 additions & 5 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ use crate::{
},
};

// TODO: Move to protocol config once initial value is finalized.
const NUM_LEADERS_PER_ROUND: usize = 1;

// Maximum number of commit votes to include in a block.
// TODO: Move to protocol config, and verify in BlockVerifier.
const MAX_COMMIT_VOTES_PER_BLOCK: usize = 100;
Expand Down Expand Up @@ -92,12 +89,16 @@ impl Core {
dag_state: Arc<RwLock<DagState>>,
) -> Self {
let last_decided_leader = dag_state.read().last_commit_leader();
let number_of_leaders = context
.protocol_config
.mysticeti_num_leaders_per_round()
.unwrap_or(1);
let committer = UniversalCommitterBuilder::new(
context.clone(),
leader_schedule.clone(),
dag_state.clone(),
)
.with_number_of_leaders(NUM_LEADERS_PER_ROUND)
.with_number_of_leaders(number_of_leaders)
.with_pipeline(true)
.build();

Expand Down Expand Up @@ -429,7 +430,7 @@ impl Core {
self.last_proposed_block = verified_block.clone();

// Now acknowledge the transactions for their inclusion to block
ack_transactions();
ack_transactions(verified_block.reference());

info!("Created block {:?}", verified_block);

Expand Down Expand Up @@ -1542,6 +1543,133 @@ mod test {
}
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_commit_on_leader_schedule_change_boundary_without_multileader() {
parameterized_test_commit_on_leader_schedule_change_boundary(Some(1)).await;
}

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_commit_on_leader_schedule_change_boundary_with_multileader() {
parameterized_test_commit_on_leader_schedule_change_boundary(None).await;
}

async fn parameterized_test_commit_on_leader_schedule_change_boundary(
num_leaders_per_round: Option<usize>,
) {
telemetry_subscribers::init_for_testing();
let default_params = Parameters::default();

let (mut context, _) = Context::new_for_test(6);
context
.protocol_config
.set_mysticeti_num_leaders_per_round(num_leaders_per_round);
// create the cores and their signals for all the authorities
let mut cores = create_cores(context, vec![1, 1, 1, 1, 1, 1]);

// Now iterate over a few rounds and ensure the corresponding signals are created while network advances
let mut last_round_blocks = Vec::new();
for round in 1..=63 {
let mut this_round_blocks = Vec::new();

// Wait for min round delay to allow blocks to be proposed.
sleep(default_params.min_round_delay).await;

for (core, signal_receivers, block_receiver, _, _) in &mut cores {
// add the blocks from last round
// this will trigger a block creation for the round and a signal should be emitted
core.add_blocks(last_round_blocks.clone()).unwrap();

// A "new round" signal should be received given that all the blocks of previous round have been processed
let new_round = receive(
Duration::from_secs(1),
signal_receivers.new_round_receiver(),
)
.await;
assert_eq!(new_round, round);

// Check that a new block has been proposed.
let block = tokio::time::timeout(Duration::from_secs(1), block_receiver.recv())
.await
.unwrap()
.unwrap();
assert_eq!(block.round(), round);
assert_eq!(block.author(), core.context.own_index);

// append the new block to this round blocks
this_round_blocks.push(core.last_proposed_block().clone());

let block = core.last_proposed_block();

// ensure that produced block is referring to the blocks of last_round
assert_eq!(block.ancestors().len(), core.context.committee.size());
for ancestor in block.ancestors() {
if block.round() > 1 {
// don't bother with round 1 block which just contains the genesis blocks.
assert!(
last_round_blocks
.iter()
.any(|block| block.reference() == *ancestor),
"Reference from previous round should be added"
);
}
}
}

last_round_blocks = this_round_blocks;
}

for (core, _, _, _, store) in cores {
// Check commits have been persisted to store
let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");
// There are 61 leader rounds with rounds completed up to and including
// round 63. Round 63 blocks will only include their own blocks, so there
// should only be 60 commits.
// However on a leader schedule change boundary its is possible for a
// new leader to get selected for the same round if the leader elected
// gets swapped allowing for multiple leaders to be committed at a round.
// Meaning with multi leader per round explicitly set to 1 we will have 60,
// otherwise 61.
// NOTE: We used 61 leader rounds to specifically trigger the scenario
// where the leader schedule boundary occured AND we had a swap to a new
// leader for the same round
let expected_commit_count = match num_leaders_per_round {
Some(1) => 60,
_ => 61,
};
assert_eq!(last_commit.index(), expected_commit_count);
let all_stored_commits = store.scan_commits((0..=CommitIndex::MAX).into()).unwrap();
assert_eq!(all_stored_commits.len(), expected_commit_count as usize);
assert_eq!(
core.leader_schedule
.leader_swap_table
.read()
.bad_nodes
.len(),
1
);
assert_eq!(
core.leader_schedule
.leader_swap_table
.read()
.good_nodes
.len(),
1
);
let expected_reputation_scores =
ReputationScores::new((51..=60).into(), vec![8, 8, 9, 8, 8, 8]);
assert_eq!(
core.leader_schedule
.leader_swap_table
.read()
.reputation_scores,
expected_reputation_scores
);
}
}

#[tokio::test]
async fn test_core_signals() {
telemetry_subscribers::init_for_testing();
Expand Down
40 changes: 23 additions & 17 deletions consensus/core/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{error, warn};

use crate::block::Transaction;
use crate::context::Context;
use crate::{
block::{BlockRef, Transaction},
context::Context,
};

/// The maximum number of transactions pending to the queue to be pulled for block proposal
const MAX_PENDING_TRANSACTIONS: usize = 2_000;
Expand All @@ -27,7 +29,7 @@ pub(crate) struct TransactionsGuard {
// A TransactionsGuard may be partially consumed by `TransactionConsumer`, in which case, this holds the remaining transactions.
transactions: Vec<Transaction>,

included_in_block_ack: oneshot::Sender<()>,
included_in_block_ack: oneshot::Sender<BlockRef>,
}

/// The TransactionConsumer is responsible for fetching the next transactions to be included for the block proposals.
Expand Down Expand Up @@ -62,7 +64,7 @@ impl TransactionConsumer {
// This returns one or more transactions to be included in the block and a callback to acknowledge the inclusion of those transactions.
// Note that a TransactionsGuard may be partially consumed and the rest saved for the next pull, in which case its `included_in_block_ack`
// will not be signalled in the callback.
pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce()>) {
pub(crate) fn next(&mut self) -> (Vec<Transaction>, Box<dyn FnOnce(BlockRef)>) {
let mut transactions = Vec::new();
let mut acks = Vec::new();
let mut total_size: usize = 0;
Expand Down Expand Up @@ -119,9 +121,9 @@ impl TransactionConsumer {

(
transactions,
Box::new(move || {
Box::new(move |block_ref: BlockRef| {
for ack in acks {
let _ = ack.send(());
let _ = ack.send(block_ref);
}
}),
)
Expand Down Expand Up @@ -172,7 +174,8 @@ impl TransactionClient {

/// Submits a list of transactions to be sequenced. The method returns when all the transactions have been successfully included
/// to next proposed blocks.
pub async fn submit(&self, transactions: Vec<Vec<u8>>) -> Result<(), ClientError> {
pub async fn submit(&self, transactions: Vec<Vec<u8>>) -> Result<BlockRef, ClientError> {
// TODO: Support returning the block refs for transactions that span multiple blocks
let included_in_block = self.submit_no_wait(transactions).await?;
included_in_block
.await
Expand All @@ -190,7 +193,7 @@ impl TransactionClient {
pub(crate) async fn submit_no_wait(
&self,
transactions: Vec<Vec<u8>>,
) -> Result<oneshot::Receiver<()>, ClientError> {
) -> Result<oneshot::Receiver<BlockRef>, ClientError> {
let (included_in_block_ack_send, included_in_block_ack_receive) = oneshot::channel();
for transaction in &transactions {
if transaction.len() as u64 > self.max_transaction_size {
Expand Down Expand Up @@ -246,15 +249,18 @@ impl TransactionVerifier for NoopTransactionVerifier {

#[cfg(test)]
mod tests {
use crate::context::Context;
use crate::transaction::{TransactionClient, TransactionConsumer};
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};

use futures::{stream::FuturesUnordered, StreamExt};
use sui_protocol_config::ProtocolConfig;
use tokio::time::timeout;

use crate::{
block::BlockRef,
context::Context,
transaction::{TransactionClient, TransactionConsumer},
};

#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn basic_submit_and_consume() {
let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| {
Expand Down Expand Up @@ -296,7 +302,7 @@ mod tests {
);

// Now acknowledge the inclusion of transactions
ack_transactions();
ack_transactions(BlockRef::MIN);

// Now make sure that all the waiters have returned
while let Some(result) = included_in_block_waiters.next().await {
Expand Down Expand Up @@ -428,7 +434,7 @@ mod tests {
// now pull the transactions from the consumer.
// we expect all transactions are fetched in order, not missing any, and not exceeding the size limit.
let mut all_transactions = Vec::new();
let mut all_acks: Vec<Box<dyn FnOnce()>> = Vec::new();
let mut all_acks: Vec<Box<dyn FnOnce(BlockRef)>> = Vec::new();
while !consumer.is_empty() {
let (transactions, ack_transactions) = consumer.next();

Expand Down Expand Up @@ -457,7 +463,7 @@ mod tests {

// now acknowledge the inclusion of all transactions.
for ack in all_acks {
ack();
ack(BlockRef::MIN);
}

// expect all receivers to be resolved.
Expand Down
18 changes: 17 additions & 1 deletion consensus/core/src/universal_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,27 @@ impl UniversalCommitter {

// Try to decide as many leaders as possible, starting with the highest round.
let mut leaders = VecDeque::new();

let last_round = match self
.context
.protocol_config
.mysticeti_num_leaders_per_round()
{
Some(1) => {
// Ensure that we don't commit any leaders from the same round as last_decided
// until we have full support for multi-leader per round.
// This can happen when we are on a leader schedule boundary and the leader
// elected for the round changes with the new schedule.
last_decided.round + 1
}
_ => last_decided.round,
};

// try to commit a leader up to the highest_accepted_round - 2. There is no
// reason to try and iterate on higher rounds as in order to make a direct
// decision for a leader at round R we need blocks from round R+2 to figure
// out that enough certificates and support exist to commit a leader.
'outer: for round in (last_decided.round..=highest_accepted_round.saturating_sub(2)).rev() {
'outer: for round in (last_round..=highest_accepted_round.saturating_sub(2)).rev() {
for committer in self.committers.iter().rev() {
// Skip committers that don't have a leader for this round.
let Some(slot) = committer.elect_leader(round) else {
Expand Down
22 changes: 1 addition & 21 deletions crates/sui-core/src/consensus_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,7 @@ impl<C: CheckpointServiceNotify + Send + Sync> ConsensusHandler<C> {

let round = consensus_output.leader_round();

assert!(round >= last_committed_round);
if last_committed_round == round {
// we can receive the same commit twice after restart
// It is critical that the writes done by this function are atomic - otherwise we can
// lose the later parts of a commit if we restart midway through processing it.
info!(
"Ignoring consensus output for round {} as it is already committed",
round
);
return;
}
assert!(round > last_committed_round);

/* (serialized, transaction, output_cert) */
let mut transactions = vec![];
Expand Down Expand Up @@ -1006,16 +996,6 @@ mod tests {
last_consensus_stats_1.stats.get_num_user_transactions(0),
num_transactions as u64
);

// WHEN processing the same output multiple times
// THEN the consensus stats do not update
for _ in 0..2 {
consensus_handler
.handle_consensus_output(consensus_output.clone())
.await;
let last_consensus_stats_2 = consensus_handler.last_consensus_stats.clone();
assert_eq!(last_consensus_stats_1, last_consensus_stats_2);
}
}

#[test]
Expand Down
21 changes: 18 additions & 3 deletions crates/sui-core/src/mysticeti_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use arc_swap::{ArcSwapOption, Guard};
use consensus_core::TransactionClient;
use sui_types::{
error::{SuiError, SuiResult},
messages_consensus::ConsensusTransaction,
messages_consensus::{ConsensusTransaction, ConsensusTransactionKind},
};
use tap::prelude::*;
use tokio::time::{sleep, timeout};
use tracing::warn;

use crate::{
authority::authority_per_epoch_store::AuthorityPerEpochStore,
consensus_adapter::SubmitToConsensus,
consensus_adapter::SubmitToConsensus, consensus_handler::SequencedConsensusTransactionKey,
};

/// Basically a wrapper struct that reads from the LOCAL_MYSTICETI_CLIENT variable where the latest
Expand Down Expand Up @@ -85,7 +85,7 @@ impl SubmitToConsensus for LazyMysticetiClient {
.iter()
.map(|t| bcs::to_bytes(t).expect("Serializing consensus transaction cannot fail"))
.collect::<Vec<_>>();
client
let block_ref = client
.as_ref()
.expect("Client should always be returned")
.submit(transactions_bytes)
Expand All @@ -95,6 +95,21 @@ impl SubmitToConsensus for LazyMysticetiClient {
warn!("Submit transactions failed with: {:?}", r);
})
.map_err(|err| SuiError::FailedToSubmitToConsensus(err.to_string()))?;

let is_soft_bundle = transactions.len() > 1;

if !is_soft_bundle
&& matches!(
transactions[0].kind,
ConsensusTransactionKind::EndOfPublish(_)
| ConsensusTransactionKind::CapabilityNotification(_)
| ConsensusTransactionKind::RandomnessDkgMessage(_, _)
| ConsensusTransactionKind::RandomnessDkgConfirmation(_, _)
)
{
let transaction_key = SequencedConsensusTransactionKey::External(transactions[0].key());
tracing::info!("Transaction {transaction_key:?} was included in {block_ref}",)
};
Ok(())
}
}
Loading

0 comments on commit 81d0217

Please sign in to comment.