Skip to content

Commit

Permalink
Rebase cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
arun-koshy committed May 8, 2024
1 parent 79e03d2 commit b48710d
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 60 deletions.
12 changes: 0 additions & 12 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,20 +488,8 @@ pub(crate) struct CommitInfo {
pub(crate) reputation_scores: ReputationScores,
}

impl CommitInfo {
// Returns a new CommitInfo.
pub(crate) fn new(committed_rounds: Vec<Round>, reputation_scores: ReputationScores) -> Self {
CommitInfo {
committed_rounds,
reputation_scores,
}
}
}

/// CommitRange stores a range of CommitIndex. The range contains the start (inclusive)
/// and end (exclusive) commit indices and can be ordered for use as the key of a table.
/// Note: If used as a key for a table it is useful to ensure the key ranges don't
/// intersect using the provided helper methods so that ordering becomes clear.
#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub(crate) struct CommitRange(Range<CommitIndex>);

Expand Down
24 changes: 12 additions & 12 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
},
commit::{
load_committed_subdag_from_store, CommitAPI as _, CommitDigest, CommitIndex, CommitInfo,
CommitRange, CommitInfo, CommitVote, CommittedSubDag, TrustedCommit,
CommitRef, CommitVote, CommittedSubDag, TrustedCommit,
},
context::Context,
leader_scoring::ReputationScores,
Expand Down Expand Up @@ -77,7 +77,7 @@ pub(crate) struct DagState {
// Buffer the reputation scores & last_committed_rounds to be flushed with the
// next dag state flush. This is okay because we can recover reputation scores
// & last_committed_rounds from the commits as needed.
commit_info_to_write: Vec<((CommitIndex, CommitDigest), CommitInfo)>,
commit_info_to_write: Vec<(CommitRef, CommitInfo)>,

// Persistent storage for blocks, commits and other consensus data.
store: Arc<dyn Store>,
Expand Down Expand Up @@ -108,16 +108,14 @@ impl DagState {
.read_last_commit_info()
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
if let Some((commit_ref, commit_info)) = commit_info {
let mut last_committed_rounds = commit_info.committed_rounds;
let mut committed_rounds = commit_info.committed_rounds;
let last_commit = last_commit
.as_ref()
.expect("There exists commit info, so the last commit should exist as well.");

if last_commit.index() > commit_ref.index {
let commit_range =
CommitRange::new((commit_ref.index + 1)..last_commit.index() + 1);
let committed_blocks = store
.scan_commits(((commit_index + 1)..last_commit.index() + 1).into())
.scan_commits(((commit_ref.index + 1)..last_commit.index() + 1).into())
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e))
.iter()
.flat_map(|commit| {
Expand All @@ -129,12 +127,12 @@ impl DagState {
.collect::<Vec<_>>();

for block in committed_blocks {
last_committed_rounds[block.author()] =
max(last_committed_rounds[block.author()], block.round());
committed_rounds[block.author()] =
max(committed_rounds[block.author()], block.round());
}
}

last_committed_rounds
committed_rounds
} else {
vec![0; num_authorities]
}
Expand Down Expand Up @@ -611,14 +609,14 @@ impl DagState {

let commit_info = CommitInfo {
reputation_scores,
last_committed_rounds: self.last_committed_rounds.clone(),
committed_rounds: self.last_committed_rounds.clone(),
};
let last_commit = self
.last_commit
.as_ref()
.expect("Last commit should already be set.");
self.commit_info_to_write
.push(((last_commit.index(), last_commit.digest()), commit_info));
.push((last_commit.reference(), commit_info));
}

pub(crate) fn take_commit_votes(&mut self, limit: usize) -> Vec<CommitVote> {
Expand Down Expand Up @@ -770,7 +768,8 @@ impl DagState {
.store
.read_last_commit_info()
.unwrap_or_else(|e| panic!("Failed to read from storage: {:?}", e));
if let Some(((_, _), commit_info)) = commit_info {
if let Some((commit_ref, commit_info)) = commit_info {
assert!(commit_ref.index <= self.last_commit.as_ref().unwrap().index());
Some(commit_info.reputation_scores)
} else {
None
Expand Down Expand Up @@ -1499,6 +1498,7 @@ mod test {
dag_state.add_commit(TrustedCommit::new_for_test(
1 as CommitIndex,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
all_blocks.last().unwrap().reference(),
all_blocks
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion consensus/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use typed_store::TypedStoreError;

use crate::{
block::{BlockRef, Round},
commit::{Commit, CommitIndex, CommitRange},
commit::{Commit, CommitIndex},
};

/// Errors that can occur when processing blocks, reading from storage, or encountering shutdown.
Expand Down
27 changes: 14 additions & 13 deletions consensus/core/src/leader_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -420,10 +420,8 @@ impl Debug for LeaderSwapTable {
mod tests {
use super::*;
use crate::{
block::{
timestamp_utc_ms, BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock,
},
commit::{CommitDigest, CommitInfo, CommittedSubDag, TrustedCommit},
block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock},
commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit},
storage::{mem_store::MemStore, Store, WriteBatch},
universal_committer::universal_committer_builder::UniversalCommitterBuilder,
};
Expand Down Expand Up @@ -486,7 +484,7 @@ mod tests {
let context = Arc::new(context);
let store = Arc::new(MemStore::new());

let leader_timestamp = timestamp_utc_ms();
let leader_timestamp = context.clock.timestamp_utc_ms();
let blocks = vec![
VerifiedBlock::new_for_test(
TestBlock::new(10, 2)
Expand All @@ -504,6 +502,7 @@ mod tests {
let last_commit = TrustedCommit::new_for_test(
last_commit_index,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
leader_ref,
blocks
.iter()
Expand All @@ -515,17 +514,17 @@ mod tests {
// info that LeaderSchedule will be recovered from
let commit_range = (1..10).into();
let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
let last_committed_rounds = vec![9, 9, 10, 9];

let committed_rounds = vec![9, 9, 10, 9];
let commit_ref = CommitRef::new(10, CommitDigest::MIN);
let commit_info = CommitInfo {
reputation_scores,
last_committed_rounds,
committed_rounds,
};

store
.write(
WriteBatch::default()
.commit_info(vec![((10, CommitDigest::MIN), commit_info)])
.commit_info(vec![(commit_ref, commit_info)])
.blocks(blocks)
.commits(vec![last_commit]),
)
Expand All @@ -534,7 +533,7 @@ mod tests {
// CommitIndex '11' will be written to store. This should result in the cached
// last_committed_rounds & unscored subdags in DagState to be updated with the
// latest commit information on recovery.
let leader_timestamp = timestamp_utc_ms();
let leader_timestamp = context.clock.timestamp_utc_ms();
let blocks = vec![
VerifiedBlock::new_for_test(
TestBlock::new(11, 3)
Expand All @@ -561,6 +560,7 @@ mod tests {
let last_commit = TrustedCommit::new_for_test(
last_commit_index,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
leader_ref,
blocks
.iter()
Expand Down Expand Up @@ -613,13 +613,13 @@ mod tests {
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());

let dag_state = Arc::new(RwLock::new(DagState::new(
context,
context.clone(),
Arc::new(MemStore::new()),
)));
let unscored_subdags = vec![CommittedSubDag::new(
BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
vec![],
timestamp_utc_ms(),
context.clock.timestamp_utc_ms(),
1,
)];
dag_state
Expand Down Expand Up @@ -705,6 +705,7 @@ mod tests {
let last_commit = TrustedCommit::new_for_test(
commit_index,
CommitDigest::MIN,
context.clock.timestamp_utc_ms(),
leader_ref,
blocks
.iter()
Expand All @@ -715,7 +716,7 @@ mod tests {
let unscored_subdags = vec![CommittedSubDag::new(
leader_ref,
blocks,
timestamp_utc_ms(),
context.clock.timestamp_utc_ms(),
commit_index,
)];

Expand Down
16 changes: 5 additions & 11 deletions consensus/core/src/storage/mem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ use super::{Store, WriteBatch};
use crate::{
block::{BlockAPI as _, BlockDigest, BlockRef, Round, Slot, VerifiedBlock},
commit::{
CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange,
TrustedCommit, CommitRef
CommitAPI as _, CommitDigest, CommitIndex, CommitInfo, CommitRange, CommitRef,
TrustedCommit,
},
error::{ConsensusError, ConsensusResult},
error::ConsensusResult,
};

/// In-memory storage for testing.
Expand Down Expand Up @@ -75,12 +75,10 @@ impl Store for MemStore {
.insert((commit.index(), commit.digest()), commit);
}

// CommitInfo can be unavailable in tests, or when we decide to skip writing it.
if let Some((commit_ref, last_commit_info)) = write_batch.last_commit_info {
for (commit_ref, commit_info) in write_batch.commit_info {
inner
.commit_info
.insert((commit_index, commit_digest), commit_info);
.insert((commit_ref.index, commit_ref.digest), last_commit_info);
.insert((commit_ref.index, commit_ref.digest), commit_info);
}

Ok(())
Expand Down Expand Up @@ -206,15 +204,11 @@ impl Store for MemStore {
Ok(votes)
}

fn read_last_commit_info(
&self,
) -> ConsensusResult<Option<((CommitIndex, CommitDigest), CommitInfo)>> {
fn read_last_commit_info(&self) -> ConsensusResult<Option<(CommitRef, CommitInfo)>> {
let inner = self.inner.read();
Ok(inner
.commit_info
.last_key_value()
.map(|(k, v)| (*k, v.clone())))
.map(|(k, v)| (CommitRef::new(k.0, k.1), v.clone())))
}
}
14 changes: 6 additions & 8 deletions consensus/core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
block::{BlockRef, Round, Slot, VerifiedBlock},
commit::{CommitInfo, CommitRange, CommitRef, TrustedCommit},
error::ConsensusResult,
CommitIndex,
};

/// A common interface for consensus storage.
Expand Down Expand Up @@ -64,19 +65,19 @@ pub(crate) trait Store: Send + Sync {
pub(crate) struct WriteBatch {
pub(crate) blocks: Vec<VerifiedBlock>,
pub(crate) commits: Vec<TrustedCommit>,
pub(crate) last_commit_info: Option<(CommitRef, CommitInfo)>,
pub(crate) commit_info: Vec<(CommitRef, CommitInfo)>,
}

impl WriteBatch {
pub(crate) fn new(
blocks: Vec<VerifiedBlock>,
commits: Vec<TrustedCommit>,
last_commit_info: Option<(CommitRef, CommitInfo)>,
commit_info: Vec<(CommitRef, CommitInfo)>,
) -> Self {
WriteBatch {
blocks,
commits,
last_commit_info,
commit_info,
}
}

Expand All @@ -95,11 +96,8 @@ impl WriteBatch {
}

#[cfg(test)]
pub(crate) fn last_commit_info(
mut self,
last_commit_info: Vec<(CommitRef, CommitInfo)>,
) -> Self {
self.last_commit_info = last_commit_info;
pub(crate) fn commit_info(mut self, commit_info: Vec<(CommitRef, CommitInfo)>) -> Self {
self.commit_info = commit_info;
self
}
}
5 changes: 2 additions & 3 deletions consensus/core/src/storage/rocksdb_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,11 @@ impl Store for RocksDBStore {
.map_err(ConsensusError::RocksDBFailure)?;
}

// CommitInfo can be unavailable in tests, or when we decide to skip writing it.
if let Some((commit_ref, last_commit_info)) = write_batch.last_commit_info {
for (commit_ref, commit_info) in write_batch.commit_info {
batch
.insert_batch(
&self.commit_info,
[((commit_ref.index, commit_ref.digest), last_commit_info)],
[((commit_ref.index, commit_ref.digest), commit_info)],
)
.map_err(ConsensusError::RocksDBFailure)?;
}
Expand Down

0 comments on commit b48710d

Please sign in to comment.