Skip to content

Commit

Permalink
[Consensus 2.0] Calculate committed sub dag digest (#17765)
Browse files Browse the repository at this point in the history
## Description 

Currently the default digest is being used on the mysticeti committed
subdag making the fork detection in checkpoints weaker as included in
the `ConsensusCommitInfo`. This PR is calculating the committed sub dag
digest and enabling it behind a feature flag. It has to be noted that
the reputation scores are not included in the sub dag digest as during
crash recovery we don't restore the actual scores for every committed
sub dag, but only restoring the last ones and sent as part of the last
committed sub dag.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
akichidis authored May 23, 2024
1 parent 821fda2 commit 2dc5a07
Show file tree
Hide file tree
Showing 13 changed files with 181 additions and 154 deletions.
46 changes: 25 additions & 21 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ impl CommitDigest {
/// Lexicographic min & max digest.
pub const MIN: Self = Self([u8::MIN; consensus_config::DIGEST_LENGTH]);
pub const MAX: Self = Self([u8::MAX; consensus_config::DIGEST_LENGTH]);

pub fn into_inner(self) -> [u8; consensus_config::DIGEST_LENGTH] {
self.0
}
}

impl Hash for CommitDigest {
Expand Down Expand Up @@ -253,8 +257,8 @@ impl fmt::Debug for CommitDigest {
/// Uniquely identifies a commit with its index and digest.
#[derive(Clone, Copy, Serialize, Deserialize, Default, PartialEq, Eq, PartialOrd, Ord)]
pub struct CommitRef {
pub(crate) index: CommitIndex,
pub(crate) digest: CommitDigest,
pub index: CommitIndex,
pub digest: CommitDigest,
}

impl CommitRef {
Expand Down Expand Up @@ -292,10 +296,10 @@ pub struct CommittedSubDag {
pub blocks: Vec<VerifiedBlock>,
/// The timestamp of the commit, obtained from the timestamp of the leader block.
pub timestamp_ms: BlockTimestampMs,
/// Index of the commit.
/// The reference of the commit.
/// First commit after genesis has a index of 1, then every next commit has a
/// index incremented by 1.
pub commit_index: CommitIndex,
pub commit_ref: CommitRef,
/// Optional scores that are provided as part of the consensus output to Sui
/// that can then be used by Sui for future submission to consensus.
pub reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
Expand All @@ -307,38 +311,38 @@ impl CommittedSubDag {
leader: BlockRef,
blocks: Vec<VerifiedBlock>,
timestamp_ms: BlockTimestampMs,
commit_index: CommitIndex,
commit_ref: CommitRef,
) -> Self {
Self {
leader,
blocks,
timestamp_ms,
commit_index,
commit_ref,
reputation_scores_desc: vec![],
}
}

pub(crate) fn update_scores(&mut self, reputation_scores_desc: Vec<(AuthorityIndex, u64)>) {
self.reputation_scores_desc = reputation_scores_desc;
}
}

/// Sort the blocks of the sub-dag by round number then authority index. Any
/// deterministic & stable algorithm works.
pub(crate) fn sort(&mut self) {
self.blocks.sort_by(|a, b| {
a.round()
.cmp(&b.round())
.then_with(|| a.author().cmp(&b.author()))
});
}
// Sort the blocks of the sub-dag blocks by round number then authority index. Any
// deterministic & stable algorithm works.
pub(crate) fn sort_sub_dag_blocks(blocks: &mut [VerifiedBlock]) {
blocks.sort_by(|a, b| {
a.round()
.cmp(&b.round())
.then_with(|| a.author().cmp(&b.author()))
})
}

impl Display for CommittedSubDag {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(
f,
"CommittedSubDag(leader={}, index={}, blocks=[",
self.leader, self.commit_index
"CommittedSubDag(leader={}, ref={}, blocks=[",
self.leader, self.commit_ref
)?;
for (idx, block) in self.blocks.iter().enumerate() {
if idx > 0 {
Expand All @@ -352,7 +356,7 @@ impl Display for CommittedSubDag {

impl fmt::Debug for CommittedSubDag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}@{} ([", self.leader, self.commit_index)?;
write!(f, "{}@{} ([", self.leader, self.commit_ref)?;
for block in &self.blocks {
write!(f, "{}, ", block.reference())?;
}
Expand Down Expand Up @@ -391,7 +395,7 @@ pub fn load_committed_subdag_from_store(
leader_block_ref,
blocks,
commit.timestamp_ms(),
commit.index(),
commit.reference(),
)
}

Expand Down Expand Up @@ -664,14 +668,14 @@ mod tests {
leader_ref,
blocks.clone(),
);
let subdag = load_committed_subdag_from_store(store.as_ref(), commit);
let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone());
assert_eq!(subdag.leader, leader_ref);
assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
assert_eq!(
subdag.blocks.len(),
(num_authorities * wave_length) as usize + 1
);
assert_eq!(subdag.commit_index, commit_index);
assert_eq!(subdag.commit_ref, commit.reference());
}

#[tokio::test]
Expand Down
19 changes: 11 additions & 8 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl CommitObserver {
}
tracing::debug!(
"Sending to execution commit {} leader {}",
committed_sub_dag.commit_index,
committed_sub_dag.commit_ref,
committed_sub_dag.leader
);
sent_sub_dags.push(committed_sub_dag);
Expand Down Expand Up @@ -283,15 +283,15 @@ mod tests {
expected_stored_refs.push(block.reference());
assert!(block.round() <= leaders[idx].round());
}
assert_eq!(subdag.commit_index, idx as CommitIndex + 1);
assert_eq!(subdag.commit_ref.index, idx as CommitIndex + 1);
}

// Check commits sent over consensus output channel is accurate
let mut processed_subdag_index = 0;
while let Ok(subdag) = receiver.try_recv() {
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == leaders.len() {
break;
}
Expand All @@ -302,7 +302,10 @@ mod tests {

// Check commits have been persisted to storage
let last_commit = mem_store.read_last_commit().unwrap().unwrap();
assert_eq!(last_commit.index(), commits.last().unwrap().commit_index);
assert_eq!(
last_commit.index(),
commits.last().unwrap().commit_ref.index
);
let all_stored_commits = mem_store
.scan_commits((0..CommitIndex::MAX).into())
.unwrap();
Expand Down Expand Up @@ -377,7 +380,7 @@ mod tests {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_processed_index {
break;
}
Expand Down Expand Up @@ -413,7 +416,7 @@ mod tests {
tracing::info!("{subdag} was sent but not processed by consumer");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_sent_index {
break;
}
Expand Down Expand Up @@ -449,7 +452,7 @@ mod tests {
tracing::info!("Processed {subdag} on resubmission");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_sent_index {
break;
}
Expand Down Expand Up @@ -517,7 +520,7 @@ mod tests {
tracing::info!("Processed {subdag}");
assert_eq!(subdag, commits[processed_subdag_index]);
assert_eq!(subdag.reputation_scores_desc, vec![]);
processed_subdag_index = subdag.commit_index as usize;
processed_subdag_index = subdag.commit_ref.index as usize;
if processed_subdag_index == expected_last_processed_index {
break;
}
Expand Down
18 changes: 5 additions & 13 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1359,23 +1359,15 @@ mod test {
let mut last_committed_rounds = vec![0; 4];
for (idx, leader) in leaders.into_iter().enumerate() {
let commit_index = idx as u32 + 1;
let subdag =
dag_builder.get_subdag(leader.clone(), last_committed_rounds.clone(), commit_index);
let (subdag, commit) = dag_builder.get_sub_dag_and_commit(
leader.clone(),
last_committed_rounds.clone(),
commit_index,
);
for block in subdag.blocks.iter() {
last_committed_rounds[block.author().value()] =
max(block.round(), last_committed_rounds[block.author().value()]);
}
let commit = TrustedCommit::new_for_test(
commit_index,
CommitDigest::MIN,
leader.timestamp_ms(),
leader.reference(),
subdag
.blocks
.iter()
.map(|block| block.reference())
.collect::<Vec<_>>(),
);
commits.push(commit);
}

Expand Down
57 changes: 19 additions & 38 deletions consensus/core/src/leader_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl LeaderSchedule {

reputation_scores.update_metrics(self.context.clone());

let last_commit_index = unscored_subdags.last().unwrap().commit_index;
let last_commit_index = unscored_subdags.last().unwrap().commit_ref.index;
self.update_leader_swap_table(LeaderSwapTable::new(
self.context.clone(),
last_commit_index,
Expand Down Expand Up @@ -650,35 +650,27 @@ mod tests {
let mut last_committed_rounds = vec![0; 4];
for (idx, leader) in leaders.into_iter().enumerate() {
let commit_index = idx as u32 + 1;
let mut subdag =
dag_builder.get_subdag(leader.clone(), last_committed_rounds.clone(), commit_index);
for block in subdag.blocks.iter() {
let (sub_dag, commit) = dag_builder.get_sub_dag_and_commit(
leader.clone(),
last_committed_rounds.clone(),
commit_index,
);
for block in sub_dag.blocks.iter() {
blocks_to_write.push(block.clone());
last_committed_rounds[block.author().value()] =
max(block.round(), last_committed_rounds[block.author().value()]);
}
let commit = TrustedCommit::new_for_test(
commit_index,
CommitDigest::MIN,
leader.timestamp_ms(),
leader.reference(),
subdag
.blocks
.iter()
.map(|block| block.reference())
.collect::<Vec<_>>(),
);

expected_commits.push(commit);
subdag.sort();
subdags.push(subdag);
subdags.push(sub_dag);
}

// The CommitInfo for the first 10 commits are written to store. This is the
// info that LeaderSchedule will be recovered from
let commit_range = (1..11).into();
let reputation_scores = ReputationScores::new(commit_range, vec![4, 1, 1, 3]);
let committed_rounds = vec![9, 9, 10, 9];
let commit_ref = CommitRef::new(10, CommitDigest::MIN);
let commit_ref = expected_commits[9].reference();
let commit_info = CommitInfo {
reputation_scores,
committed_rounds,
Expand All @@ -705,8 +697,7 @@ mod tests {
);
let actual_unscored_subdags = dag_state.read().unscored_committed_subdags();
assert_eq!(1, dag_state.read().unscored_committed_subdags_count());
let mut actual_subdag = actual_unscored_subdags[0].clone();
actual_subdag.sort();
let actual_subdag = actual_unscored_subdags[0].clone();
assert_eq!(*subdags.last().unwrap(), actual_subdag);

let leader_schedule = LeaderSchedule::from_store(context.clone(), dag_state.clone());
Expand Down Expand Up @@ -783,26 +774,17 @@ mod tests {
let mut last_committed_rounds = vec![0; 4];
for (idx, leader) in leaders.into_iter().enumerate() {
let commit_index = idx as u32 + 1;
let mut subdag =
dag_builder.get_subdag(leader.clone(), last_committed_rounds.clone(), commit_index);
let (subdag, commit) = dag_builder.get_sub_dag_and_commit(
leader.clone(),
last_committed_rounds.clone(),
commit_index,
);
for block in subdag.blocks.iter() {
blocks_to_write.push(block.clone());
last_committed_rounds[block.author().value()] =
max(block.round(), last_committed_rounds[block.author().value()]);
}
let commit = TrustedCommit::new_for_test(
commit_index,
CommitDigest::MIN,
leader.timestamp_ms(),
leader.reference(),
subdag
.blocks
.iter()
.map(|block| block.reference())
.collect::<Vec<_>>(),
);
expected_commits.push(commit);
subdag.sort();
expected_unscored_subdags.push(subdag);
}

Expand Down Expand Up @@ -832,8 +814,7 @@ mod tests {
dag_state.read().unscored_committed_subdags_count()
);
for (idx, expected_subdag) in expected_unscored_subdags.into_iter().enumerate() {
let mut actual_subdag = actual_unscored_subdags[idx].clone();
actual_subdag.sort();
let actual_subdag = actual_unscored_subdags[idx].clone();
assert_eq!(expected_subdag, actual_subdag);
}

Expand All @@ -859,7 +840,7 @@ mod tests {
BlockRef::new(1, AuthorityIndex::ZERO, BlockDigest::MIN),
vec![],
context.clock.timestamp_utc_ms(),
1,
CommitRef::new(1, CommitDigest::MIN),
)];
dag_state
.write()
Expand Down Expand Up @@ -956,7 +937,7 @@ mod tests {
leader_ref,
blocks,
context.clock.timestamp_utc_ms(),
commit_index,
last_commit.reference(),
)];

let mut dag_state_write = dag_state.write();
Expand Down
Loading

0 comments on commit 2dc5a07

Please sign in to comment.