diff --git a/consensus/config/src/committee.rs b/consensus/config/src/committee.rs index f7dfe2ce6a688..6a29101f5f191 100644 --- a/consensus/config/src/committee.rs +++ b/consensus/config/src/committee.rs @@ -21,7 +21,7 @@ pub type Stake = u64; /// Committee is the set of authorities that participate in the consensus protocol for this epoch. /// Its configuration is stored and computed on chain. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Committee { /// The epoch number of this committee epoch: Epoch, @@ -128,7 +128,7 @@ impl Committee { /// /// NOTE: this is intentionally un-cloneable, to encourage only copying relevant fields. /// AuthorityIndex should be used to reference an authority instead. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct Authority { /// Voting power of the authority in the committee. pub stake: Stake, diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index 66f7fd2bf64db..cd48c5c0483cf 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -103,7 +103,10 @@ where commit_consumer: CommitConsumer, registry: Registry, ) -> Self { - info!("Starting authority with index {}", own_index); + info!( + "Starting authority {}\n{:#?}\n{:#?}\n{:?}", + own_index, committee, parameters, protocol_config.version + ); let context = Arc::new(Context::new( own_index, committee, @@ -345,8 +348,8 @@ mod tests { use std::sync::Arc; use async_trait::async_trait; - use consensus_config::{local_committee_and_keys, NetworkKeyPair, Parameters, ProtocolKeyPair}; - use fastcrypto::traits::ToFromBytes; + use consensus_config::{local_committee_and_keys, Parameters}; + use fastcrypto::traits::KeyPair; use parking_lot::Mutex; use prometheus::Registry; use sui_protocol_config::ProtocolConfig; @@ -423,7 +426,7 @@ mod tests { } #[tokio::test] - async fn start_and_stop() { + async fn test_authority_start_and_stop() { let (committee, keypairs) = local_committee_and_keys(0, vec![1]); let registry = Registry::new(); @@ -434,9 +437,9 @@ mod tests { }; let txn_verifier = NoopTransactionVerifier {}; - let (own_index, _) = committee.authorities().last().unwrap(); - let protocol_keypair = ProtocolKeyPair::from_bytes(keypairs[0].1.as_bytes()).unwrap(); - let network_keypair = NetworkKeyPair::from_bytes(keypairs[0].0.as_bytes()).unwrap(); + let own_index = committee.to_authority_index(0).unwrap(); + let protocol_keypair = keypairs[own_index].1.copy(); + let network_keypair = keypairs[own_index].0.copy(); let (sender, _receiver) = unbounded_channel(); let commit_consumer = CommitConsumer::new( @@ -512,4 +515,84 @@ mod tests { assert_eq!(blocks.len(), 1); assert_eq!(blocks[0], input_block); } + + // TODO: build AuthorityFixture. + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn test_authority_committee() { + let (committee, keypairs) = local_committee_and_keys(0, vec![1, 1, 1, 1]); + let mut output_receivers = vec![]; + let mut authorities = vec![]; + for (index, _authority_info) in committee.authorities() { + let registry = Registry::new(); + + let temp_dir = TempDir::new().unwrap(); + let parameters = Parameters { + db_path: Some(temp_dir.into_path()), + ..Default::default() + }; + let txn_verifier = NoopTransactionVerifier {}; + + let protocol_keypair = keypairs[index].1.copy(); + let network_keypair = keypairs[index].0.copy(); + + let (sender, receiver) = unbounded_channel(); + let commit_consumer = CommitConsumer::new( + sender, 0, // last_processed_index + ); + output_receivers.push(receiver); + + let authority = ConsensusAuthority::start( + index, + committee.clone(), + parameters, + ProtocolConfig::get_for_max_version_UNSAFE(), + protocol_keypair, + network_keypair, + Arc::new(txn_verifier), + commit_consumer, + registry, + ) + .await; + authorities.push(authority); + } + + const NUM_TRANSACTIONS: u8 = 15; + let mut submitted_transactions = BTreeSet::>::new(); + for i in 0..NUM_TRANSACTIONS { + let txn = vec![i; 16]; + submitted_transactions.insert(txn.clone()); + authorities[i as usize % authorities.len()] + .transaction_client() + .submit(txn) + .await + .unwrap(); + } + + for mut receiver in output_receivers { + let mut expected_transactions = submitted_transactions.clone(); + loop { + let committed_subdag = + tokio::time::timeout(Duration::from_secs(1), receiver.recv()) + .await + .unwrap() + .unwrap(); + for b in committed_subdag.blocks { + for txn in b.transactions().iter().map(|t| t.data().to_vec()) { + assert!( + expected_transactions.remove(&txn), + "Transaction not submitted or already seen: {:?}", + txn + ); + } + } + if expected_transactions.is_empty() { + break; + } + } + } + + for authority in authorities { + authority.stop().await; + } + } } diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 88ce0cca92e2e..eeb501091337c 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -126,6 +126,9 @@ impl Core { // Accept all blocks but make sure that only the last quorum round blocks and onwards are kept. self.add_accepted_blocks(all_blocks, Some(0)); + + // TODO: run commit and propose logic, or just use add_blocks() instead of add_accepted_blocks(). + self } @@ -140,7 +143,7 @@ impl Core { .try_accept_blocks(blocks) .unwrap_or_else(|err| panic!("Fatal error while accepting blocks: {err}")); - // Now process them, basically move the threshold clock and add them to pending list + // Now add accepted blocks to the threshold clock and pending ancestors list. self.add_accepted_blocks(accepted_blocks, None); // TODO: Add optimization for added blocks that do not achieve quorum for a round. @@ -241,10 +244,10 @@ impl Core { let now = timestamp_utc_ms(); let ancestors = self.ancestors_to_propose(clock_round, now); - //2. consume the next transactions to be included. - let payload = self.transaction_consumer.next(); + // 2. Consume the next transactions to be included. + let transactions = self.transaction_consumer.next(); - //3. create the block and insert to storage. + // 3. Create the block and insert to storage. // TODO: take a decision on whether we want to flush to disk at this point the DagState. let block = Block::V1(BlockV1::new( self.context.committee.epoch(), @@ -252,7 +255,7 @@ impl Core { self.context.own_index, now, ancestors, - payload, + transactions, )); let signed_block = SignedBlock::new(block, &self.block_signer).expect("Block signing failed."); @@ -286,8 +289,6 @@ impl Core { let _ = self.signals.new_block_ready(verified_block.reference()); // TODO: propagate shutdown or ensure this will never return error? - self.try_commit(); - return Some(verified_block); } @@ -545,6 +546,11 @@ mod test { store.clone(), ); + // Check no commits have been persisted to dag_state or store. + let last_commit = store.read_last_commit().unwrap(); + assert!(last_commit.is_none()); + assert_eq!(dag_state.read().last_commit_index(), 0); + // Now spin up core let (signals, signal_receivers) = CoreSignals::new(); let mut core = Core::new( @@ -562,11 +568,6 @@ mod test { let mut new_round = signal_receivers.new_round_receiver(); assert_eq!(*new_round.borrow_and_update(), 5); - // Check no commits have been persisted to dag_state & store - let last_commit = store.read_last_commit().unwrap(); - assert!(last_commit.is_none()); - assert_eq!(dag_state.read().last_commit_index(), 0); - // When trying to propose now we should propose block for round 5 let proposed_block = core .try_new_block(true) @@ -580,11 +581,13 @@ mod test { assert_eq!(ancestor.round, 4); } - // Check commits have been persisted to dag state & store + // Run commit rule. + core.try_commit(); let last_commit = store .read_last_commit() .unwrap() .expect("last commit should be set"); + // There were no commits prior to the core starting up but there was completed // rounds up to and including round 4. So we should commit leaders in round 1 & 2 // as soon as the new block for round 5 is proposed. @@ -606,7 +609,7 @@ mod test { let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone()); let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None); - // Create test blocks for all authorities except our's (index = 0) . + // Create test blocks for all authorities except our's (index = 0). let (_, mut last_round_blocks) = Block::genesis(context.clone()); let mut all_blocks = last_round_blocks.clone(); for round in 1..=4 { @@ -646,6 +649,11 @@ mod test { store.clone(), ); + // Check no commits have been persisted to dag_state & store + let last_commit = store.read_last_commit().unwrap(); + assert!(last_commit.is_none()); + assert_eq!(dag_state.read().last_commit_index(), 0); + // Now spin up core let (signals, signal_receivers) = CoreSignals::new(); let mut core = Core::new( @@ -663,11 +671,6 @@ mod test { let mut new_round = signal_receivers.new_round_receiver(); assert_eq!(*new_round.borrow_and_update(), 4); - // Check no commits have been persisted to dag_state & store - let last_commit = store.read_last_commit().unwrap(); - assert!(last_commit.is_none()); - assert_eq!(dag_state.read().last_commit_index(), 0); - // When trying to propose now we should propose block for round 4 let proposed_block = core .try_new_block(true) @@ -684,11 +687,13 @@ mod test { } } - // Check commits have been persisted to dag state & store + // Run commit rule. + core.try_commit(); let last_commit = store .read_last_commit() .unwrap() .expect("last commit should be set"); + // There were no commits prior to the core starting up but there was completed // rounds up to round 4. So we should commit leaders in round 1 & 2 as soon // as the new block for round 4 is proposed. diff --git a/consensus/core/src/synchronizer.rs b/consensus/core/src/synchronizer.rs index d586898c36bba..c8e0fda979161 100644 --- a/consensus/core/src/synchronizer.rs +++ b/consensus/core/src/synchronizer.rs @@ -687,7 +687,7 @@ mod tests { } } - #[tokio::test] + #[tokio::test(flavor = "current_thread", start_paused = true)] async fn synchronizer_periodic_task_fetch_blocks() { // GIVEN let (context, _) = Context::new_for_test(4);