diff --git a/consensus/core/src/commit.rs b/consensus/core/src/commit.rs index fc947e2a45de3..95d0c051f24a8 100644 --- a/consensus/core/src/commit.rs +++ b/consensus/core/src/commit.rs @@ -488,20 +488,8 @@ pub(crate) struct CommitInfo { pub(crate) reputation_scores: ReputationScores, } -impl CommitInfo { - // Returns a new CommitInfo. - pub(crate) fn new(committed_rounds: Vec, reputation_scores: ReputationScores) -> Self { - CommitInfo { - committed_rounds, - reputation_scores, - } - } -} - /// CommitRange stores a range of CommitIndex. The range contains the start (inclusive) /// and end (exclusive) commit indices and can be ordered for use as the key of a table. -/// Note: If used as a key for a table it is useful to ensure the key ranges don't -/// intersect using the provided helper methods so that ordering becomes clear. #[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] pub(crate) struct CommitRange(Range); diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index c6fba27234341..ba4586326446b 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -20,7 +20,7 @@ use crate::{ }, commit::{ load_committed_subdag_from_store, CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, - CommitRange, CommitInfo, CommitVote, CommittedSubDag, TrustedCommit, + CommitRef, CommitVote, CommittedSubDag, TrustedCommit, }, context::Context, leader_scoring::ReputationScores, @@ -77,7 +77,7 @@ pub(crate) struct DagState { // Buffer the reputation scores & last_committed_rounds to be flushed with the // next dag state flush. This is okay because we can recover reputation scores // & last_committed_rounds from the commits as needed. - commit_info_to_write: Vec<((CommitIndex, CommitDigest), CommitInfo)>, + commit_info_to_write: Vec<(CommitRef, CommitInfo)>, // Persistent storage for blocks, commits and other consensus data. store: Arc, @@ -108,16 +108,14 @@ impl DagState { .read_last_commit_info() .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)); if let Some((commit_ref, commit_info)) = commit_info { - let mut last_committed_rounds = commit_info.committed_rounds; + let mut committed_rounds = commit_info.committed_rounds; let last_commit = last_commit .as_ref() .expect("There exists commit info, so the last commit should exist as well."); if last_commit.index() > commit_ref.index { - let commit_range = - CommitRange::new((commit_ref.index + 1)..last_commit.index() + 1); let committed_blocks = store - .scan_commits(((commit_index + 1)..last_commit.index() + 1).into()) + .scan_commits(((commit_ref.index + 1)..last_commit.index() + 1).into()) .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)) .iter() .flat_map(|commit| { @@ -129,12 +127,12 @@ impl DagState { .collect::>(); for block in committed_blocks { - last_committed_rounds[block.author()] = - max(last_committed_rounds[block.author()], block.round()); + committed_rounds[block.author()] = + max(committed_rounds[block.author()], block.round()); } } - last_committed_rounds + committed_rounds } else { vec![0; num_authorities] } @@ -611,14 +609,14 @@ impl DagState { let commit_info = CommitInfo { reputation_scores, - last_committed_rounds: self.last_committed_rounds.clone(), + committed_rounds: self.last_committed_rounds.clone(), }; let last_commit = self .last_commit .as_ref() .expect("Last commit should already be set."); self.commit_info_to_write - .push(((last_commit.index(), last_commit.digest()), commit_info)); + .push((last_commit.reference(), commit_info)); } pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec { @@ -770,7 +768,8 @@ impl DagState { .store .read_last_commit_info() .unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e)); - if let Some(((_, _), commit_info)) = commit_info { + if let Some((commit_ref, commit_info)) = commit_info { + assert!(commit_ref.index <= self.last_commit.as_ref().unwrap().index()); Some(commit_info.reputation_scores) } else { None @@ -1499,6 +1498,7 @@ mod test { dag_state.add_commit(TrustedCommit::new_for_test( 1 as CommitIndex, CommitDigest::MIN, + context.clock.timestamp_utc_ms(), all_blocks.last().unwrap().reference(), all_blocks .into_iter() diff --git a/consensus/core/src/error.rs b/consensus/core/src/error.rs index 96b586eeeb3d1..bb42a92b176a1 100644 --- a/consensus/core/src/error.rs +++ b/consensus/core/src/error.rs @@ -8,7 +8,7 @@ use typed_store::TypedStoreError; use crate::{ block::{BlockRef, Round}, - commit::{Commit, CommitIndex, CommitRange}, + commit::{Commit, CommitIndex}, }; /// Errors that can occur when processing blocks, reading from storage, or encountering shutdown. diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index fae99aaedf382..357b28d68e2ce 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -420,10 +420,8 @@ impl Debug for LeaderSwapTable { mod tests { use super::*; use crate::{ - block::{ - timestamp_utc_ms, BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock, - }, - commit::{CommitDigest, CommitInfo, CommittedSubDag, TrustedCommit}, + block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock}, + commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit}, storage::{mem_store::MemStore, Store, WriteBatch}, universal_committer::universal_committer_builder::UniversalCommitterBuilder, }; @@ -486,7 +484,7 @@ mod tests { let context = Arc::new(context); let store = Arc::new(MemStore::new()); - let leader_timestamp = timestamp_utc_ms(); + let leader_timestamp = context.clock.timestamp_utc_ms(); let blocks = vec![ VerifiedBlock::new_for_test( TestBlock::new(10, 2) @@ -504,6 +502,7 @@ mod tests { let last_commit = TrustedCommit::new_for_test( last_commit_index, CommitDigest::MIN, + context.clock.timestamp_utc_ms(), leader_ref, blocks .iter() @@ -515,17 +514,17 @@ mod tests { // info that LeaderSchedule will be recovered from let commit_range = (1..10).into(); let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]); - let last_committed_rounds = vec![9, 9, 10, 9]; - + let committed_rounds = vec![9, 9, 10, 9]; + let commit_ref = CommitRef::new(10, CommitDigest::MIN); let commit_info = CommitInfo { reputation_scores, - last_committed_rounds, + committed_rounds, }; store .write( WriteBatch::default() - .commit_info(vec![((10, CommitDigest::MIN), commit_info)]) + .commit_info(vec![(commit_ref, commit_info)]) .blocks(blocks) .commits(vec![last_commit]), ) @@ -534,7 +533,7 @@ mod tests { // CommitIndex '11' will be written to store. This should result in the cached // last_committed_rounds & unscored subdags in DagState to be updated with the // latest commit information on recovery. - let leader_timestamp = timestamp_utc_ms(); + let leader_timestamp = context.clock.timestamp_utc_ms(); let blocks = vec![ VerifiedBlock::new_for_test( TestBlock::new(11, 3) @@ -561,6 +560,7 @@ mod tests { let last_commit = TrustedCommit::new_for_test( last_commit_index, CommitDigest::MIN, + context.clock.timestamp_utc_ms(), leader_ref, blocks .iter() @@ -613,13 +613,13 @@ mod tests { let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default()); let dag_state = Arc::new(RwLock::new(DagState::new( - context, + context.clone(), Arc::new(MemStore::new()), ))); let unscored_subdags = vec![CommittedSubDag::new( BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN), vec![], - timestamp_utc_ms(), + context.clock.timestamp_utc_ms(), 1, )]; dag_state @@ -705,6 +705,7 @@ mod tests { let last_commit = TrustedCommit::new_for_test( commit_index, CommitDigest::MIN, + context.clock.timestamp_utc_ms(), leader_ref, blocks .iter() @@ -715,7 +716,7 @@ mod tests { let unscored_subdags = vec![CommittedSubDag::new( leader_ref, blocks, - timestamp_utc_ms(), + context.clock.timestamp_utc_ms(), commit_index, )]; diff --git a/consensus/core/src/storage/mem_store.rs b/consensus/core/src/storage/mem_store.rs index f56751c3cf646..79320d89824c6 100644 --- a/consensus/core/src/storage/mem_store.rs +++ b/consensus/core/src/storage/mem_store.rs @@ -13,10 +13,10 @@ use super::{Store, WriteBatch}; use crate::{ block::{BlockAPI as _, BlockDigest, BlockRef, Round, Slot, VerifiedBlock}, commit::{ - CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, - TrustedCommit, CommitRef + CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, CommitRef, + TrustedCommit, }, - error::{ConsensusError, ConsensusResult}, + error::ConsensusResult, }; /// In-memory storage for testing. @@ -75,12 +75,10 @@ impl Store for MemStore { .insert((commit.index(), commit.digest()), commit); } - // CommitInfo can be unavailable in tests, or when we decide to skip writing it. - if let Some((commit_ref, last_commit_info)) = write_batch.last_commit_info { + for (commit_ref, commit_info) in write_batch.commit_info { inner .commit_info - .insert((commit_index, commit_digest), commit_info); - .insert((commit_ref.index, commit_ref.digest), last_commit_info); + .insert((commit_ref.index, commit_ref.digest), commit_info); } Ok(()) @@ -206,15 +204,11 @@ impl Store for MemStore { Ok(votes) } - fn read_last_commit_info( - &self, - ) -> ConsensusResult> { fn read_last_commit_info(&self) -> ConsensusResult> { let inner = self.inner.read(); Ok(inner .commit_info .last_key_value() - .map(|(k, v)| (*k, v.clone()))) .map(|(k, v)| (CommitRef::new(k.0, k.1), v.clone()))) } } diff --git a/consensus/core/src/storage/mod.rs b/consensus/core/src/storage/mod.rs index ed5ac799edda0..4fcc35cda7ce4 100644 --- a/consensus/core/src/storage/mod.rs +++ b/consensus/core/src/storage/mod.rs @@ -13,6 +13,7 @@ use crate::{ block::{BlockRef, Round, Slot, VerifiedBlock}, commit::{CommitInfo, CommitRange, CommitRef, TrustedCommit}, error::ConsensusResult, + CommitIndex, }; /// A common interface for consensus storage. @@ -64,19 +65,19 @@ pub(crate) trait Store: Send + Sync { pub(crate) struct WriteBatch { pub(crate) blocks: Vec, pub(crate) commits: Vec, - pub(crate) last_commit_info: Option<(CommitRef, CommitInfo)>, + pub(crate) commit_info: Vec<(CommitRef, CommitInfo)>, } impl WriteBatch { pub(crate) fn new( blocks: Vec, commits: Vec, - last_commit_info: Option<(CommitRef, CommitInfo)>, + commit_info: Vec<(CommitRef, CommitInfo)>, ) -> Self { WriteBatch { blocks, commits, - last_commit_info, + commit_info, } } @@ -95,11 +96,8 @@ impl WriteBatch { } #[cfg(test)] - pub(crate) fn last_commit_info( - mut self, - last_commit_info: Vec<(CommitRef, CommitInfo)>, - ) -> Self { - self.last_commit_info = last_commit_info; + pub(crate) fn commit_info(mut self, commit_info: Vec<(CommitRef, CommitInfo)>) -> Self { + self.commit_info = commit_info; self } } diff --git a/consensus/core/src/storage/rocksdb_store.rs b/consensus/core/src/storage/rocksdb_store.rs index dda6aed74b184..fb26f25e97942 100644 --- a/consensus/core/src/storage/rocksdb_store.rs +++ b/consensus/core/src/storage/rocksdb_store.rs @@ -136,12 +136,11 @@ impl Store for RocksDBStore { .map_err(ConsensusError::RocksDBFailure)?; } - // CommitInfo can be unavailable in tests, or when we decide to skip writing it. - if let Some((commit_ref, last_commit_info)) = write_batch.last_commit_info { + for (commit_ref, commit_info) in write_batch.commit_info { batch .insert_batch( &self.commit_info, - [((commit_ref.index, commit_ref.digest), last_commit_info)], + [((commit_ref.index, commit_ref.digest), commit_info)], ) .map_err(ConsensusError::RocksDBFailure)?; }