Skip to content

Commit

Permalink
[Consensus] add a small integration test with a committee of authorit…
Browse files Browse the repository at this point in the history
…ies (#16279)

## Description 

Add an integration test with a committee of authorities, including
transaction submission and verification.

## Test Plan 

Unit test.

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Feb 24, 2024
1 parent 840bde8 commit 799916b
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 30 deletions.
4 changes: 2 additions & 2 deletions consensus/config/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub type Stake = u64;

/// Committee is the set of authorities that participate in the consensus protocol for this epoch.
/// Its configuration is stored and computed on chain.
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Committee {
/// The epoch number of this committee
epoch: Epoch,
Expand Down Expand Up @@ -128,7 +128,7 @@ impl Committee {
///
/// NOTE: this is intentionally un-cloneable, to encourage only copying relevant fields.
/// AuthorityIndex should be used to reference an authority instead.
#[derive(Clone, Serialize, Deserialize)]
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct Authority {
/// Voting power of the authority in the committee.
pub stake: Stake,
Expand Down
97 changes: 90 additions & 7 deletions consensus/core/src/authority_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ where
commit_consumer: CommitConsumer,
registry: Registry,
) -> Self {
info!("Starting authority with index {}", own_index);
info!(
"Starting authority {}\n{:#?}\n{:#?}\n{:?}",
own_index, committee, parameters, protocol_config.version
);
let context = Arc::new(Context::new(
own_index,
committee,
Expand Down Expand Up @@ -345,8 +348,8 @@ mod tests {
use std::sync::Arc;

use async_trait::async_trait;
use consensus_config::{local_committee_and_keys, NetworkKeyPair, Parameters, ProtocolKeyPair};
use fastcrypto::traits::ToFromBytes;
use consensus_config::{local_committee_and_keys, Parameters};
use fastcrypto::traits::KeyPair;
use parking_lot::Mutex;
use prometheus::Registry;
use sui_protocol_config::ProtocolConfig;
Expand Down Expand Up @@ -423,7 +426,7 @@ mod tests {
}

#[tokio::test]
async fn start_and_stop() {
async fn test_authority_start_and_stop() {
let (committee, keypairs) = local_committee_and_keys(0, vec![1]);
let registry = Registry::new();

Expand All @@ -434,9 +437,9 @@ mod tests {
};
let txn_verifier = NoopTransactionVerifier {};

let (own_index, _) = committee.authorities().last().unwrap();
let protocol_keypair = ProtocolKeyPair::from_bytes(keypairs[0].1.as_bytes()).unwrap();
let network_keypair = NetworkKeyPair::from_bytes(keypairs[0].0.as_bytes()).unwrap();
let own_index = committee.to_authority_index(0).unwrap();
let protocol_keypair = keypairs[own_index].1.copy();
let network_keypair = keypairs[own_index].0.copy();

let (sender, _receiver) = unbounded_channel();
let commit_consumer = CommitConsumer::new(
Expand Down Expand Up @@ -512,4 +515,84 @@ mod tests {
assert_eq!(blocks.len(), 1);
assert_eq!(blocks[0], input_block);
}

// TODO: build AuthorityFixture.
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn test_authority_committee() {
let (committee, keypairs) = local_committee_and_keys(0, vec![1, 1, 1, 1]);
let mut output_receivers = vec![];
let mut authorities = vec![];
for (index, _authority_info) in committee.authorities() {
let registry = Registry::new();

let temp_dir = TempDir::new().unwrap();
let parameters = Parameters {
db_path: Some(temp_dir.into_path()),
..Default::default()
};
let txn_verifier = NoopTransactionVerifier {};

let protocol_keypair = keypairs[index].1.copy();
let network_keypair = keypairs[index].0.copy();

let (sender, receiver) = unbounded_channel();
let commit_consumer = CommitConsumer::new(
sender, 0, // last_processed_index
);
output_receivers.push(receiver);

let authority = ConsensusAuthority::start(
index,
committee.clone(),
parameters,
ProtocolConfig::get_for_max_version_UNSAFE(),
protocol_keypair,
network_keypair,
Arc::new(txn_verifier),
commit_consumer,
registry,
)
.await;
authorities.push(authority);
}

const NUM_TRANSACTIONS: u8 = 15;
let mut submitted_transactions = BTreeSet::<Vec<u8>>::new();
for i in 0..NUM_TRANSACTIONS {
let txn = vec![i; 16];
submitted_transactions.insert(txn.clone());
authorities[i as usize % authorities.len()]
.transaction_client()
.submit(txn)
.await
.unwrap();
}

for mut receiver in output_receivers {
let mut expected_transactions = submitted_transactions.clone();
loop {
let committed_subdag =
tokio::time::timeout(Duration::from_secs(1), receiver.recv())
.await
.unwrap()
.unwrap();
for b in committed_subdag.blocks {
for txn in b.transactions().iter().map(|t| t.data().to_vec()) {
assert!(
expected_transactions.remove(&txn),
"Transaction not submitted or already seen: {:?}",
txn
);
}
}
if expected_transactions.is_empty() {
break;
}
}
}

for authority in authorities {
authority.stop().await;
}
}
}
45 changes: 25 additions & 20 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ impl Core {

// Accept all blocks but make sure that only the last quorum round blocks and onwards are kept.
self.add_accepted_blocks(all_blocks, Some(0));

// TODO: run commit and propose logic, or just use add_blocks() instead of add_accepted_blocks().

self
}

Expand All @@ -140,7 +143,7 @@ impl Core {
.try_accept_blocks(blocks)
.unwrap_or_else(|err| panic!("Fatal error while accepting blocks: {err}"));

// Now process them, basically move the threshold clock and add them to pending list
// Now add accepted blocks to the threshold clock and pending ancestors list.
self.add_accepted_blocks(accepted_blocks, None);

// TODO: Add optimization for added blocks that do not achieve quorum for a round.
Expand Down Expand Up @@ -241,18 +244,18 @@ impl Core {
let now = timestamp_utc_ms();
let ancestors = self.ancestors_to_propose(clock_round, now);

//2. consume the next transactions to be included.
let payload = self.transaction_consumer.next();
// 2. Consume the next transactions to be included.
let transactions = self.transaction_consumer.next();

//3. create the block and insert to storage.
// 3. Create the block and insert to storage.
// TODO: take a decision on whether we want to flush to disk at this point the DagState.
let block = Block::V1(BlockV1::new(
self.context.committee.epoch(),
clock_round,
self.context.own_index,
now,
ancestors,
payload,
transactions,
));
let signed_block =
SignedBlock::new(block, &self.block_signer).expect("Block signing failed.");
Expand Down Expand Up @@ -286,8 +289,6 @@ impl Core {
let _ = self.signals.new_block_ready(verified_block.reference());
// TODO: propagate shutdown or ensure this will never return error?

self.try_commit();

return Some(verified_block);
}

Expand Down Expand Up @@ -545,6 +546,11 @@ mod test {
store.clone(),
);

// Check no commits have been persisted to dag_state or store.
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);

// Now spin up core
let (signals, signal_receivers) = CoreSignals::new();
let mut core = Core::new(
Expand All @@ -562,11 +568,6 @@ mod test {
let mut new_round = signal_receivers.new_round_receiver();
assert_eq!(*new_round.borrow_and_update(), 5);

// Check no commits have been persisted to dag_state & store
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);

// When trying to propose now we should propose block for round 5
let proposed_block = core
.try_new_block(true)
Expand All @@ -580,11 +581,13 @@ mod test {
assert_eq!(ancestor.round, 4);
}

// Check commits have been persisted to dag state & store
// Run commit rule.
core.try_commit();
let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");

// There were no commits prior to the core starting up but there was completed
// rounds up to and including round 4. So we should commit leaders in round 1 & 2
// as soon as the new block for round 5 is proposed.
Expand All @@ -606,7 +609,7 @@ mod test {
let (_transaction_client, tx_receiver) = TransactionClient::new(context.clone());
let transaction_consumer = TransactionConsumer::new(tx_receiver, context.clone(), None);

// Create test blocks for all authorities except our's (index = 0) .
// Create test blocks for all authorities except our's (index = 0).
let (_, mut last_round_blocks) = Block::genesis(context.clone());
let mut all_blocks = last_round_blocks.clone();
for round in 1..=4 {
Expand Down Expand Up @@ -646,6 +649,11 @@ mod test {
store.clone(),
);

// Check no commits have been persisted to dag_state & store
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);

// Now spin up core
let (signals, signal_receivers) = CoreSignals::new();
let mut core = Core::new(
Expand All @@ -663,11 +671,6 @@ mod test {
let mut new_round = signal_receivers.new_round_receiver();
assert_eq!(*new_round.borrow_and_update(), 4);

// Check no commits have been persisted to dag_state & store
let last_commit = store.read_last_commit().unwrap();
assert!(last_commit.is_none());
assert_eq!(dag_state.read().last_commit_index(), 0);

// When trying to propose now we should propose block for round 4
let proposed_block = core
.try_new_block(true)
Expand All @@ -684,11 +687,13 @@ mod test {
}
}

// Check commits have been persisted to dag state & store
// Run commit rule.
core.try_commit();
let last_commit = store
.read_last_commit()
.unwrap()
.expect("last commit should be set");

// There were no commits prior to the core starting up but there was completed
// rounds up to round 4. So we should commit leaders in round 1 & 2 as soon
// as the new block for round 4 is proposed.
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,7 @@ mod tests {
}
}

#[tokio::test]
#[tokio::test(flavor = "current_thread", start_paused = true)]
async fn synchronizer_periodic_task_fetch_blocks() {
// GIVEN
let (context, _) = Context::new_for_test(4);
Expand Down

0 comments on commit 799916b

Please sign in to comment.