Skip to content

Commit

Permalink
[Consensus 2.0] Small refactoring in scores for CommittedSubDag (#17768)
Browse files Browse the repository at this point in the history
## Description 

Initialise the consensus scores in committed sub dag during
construction. Also always pre-sort the blocks , that should be the
default anyways either during commit or recover.

## Test plan 

CI

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
akichidis authored May 28, 2024
1 parent 60aed80 commit 71c997e
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 45 deletions.
12 changes: 6 additions & 6 deletions consensus/core/src/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,19 +312,16 @@ impl CommittedSubDag {
blocks: Vec<VerifiedBlock>,
timestamp_ms: BlockTimestampMs,
commit_ref: CommitRef,
reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
) -> Self {
Self {
leader,
blocks,
timestamp_ms,
commit_ref,
reputation_scores_desc: vec![],
reputation_scores_desc,
}
}

pub(crate) fn update_scores(&mut self, reputation_scores_desc: Vec<(AuthorityIndex, u64)>) {
self.reputation_scores_desc = reputation_scores_desc;
}
}

// Sort the blocks of the sub-dag blocks by round number then authority index. Any
Expand Down Expand Up @@ -372,6 +369,7 @@ impl fmt::Debug for CommittedSubDag {
pub fn load_committed_subdag_from_store(
store: &dyn Store,
commit: TrustedCommit,
reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
) -> CommittedSubDag {
let mut leader_block_idx = None;
let commit_blocks = store
Expand All @@ -396,6 +394,7 @@ pub fn load_committed_subdag_from_store(
blocks,
commit.timestamp_ms(),
commit.reference(),
reputation_scores_desc,
)
}

Expand Down Expand Up @@ -668,14 +667,15 @@ mod tests {
leader_ref,
blocks.clone(),
);
let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone());
let subdag = load_committed_subdag_from_store(store.as_ref(), commit.clone(), vec![]);
assert_eq!(subdag.leader, leader_ref);
assert_eq!(subdag.timestamp_ms, leader_block.timestamp_ms());
assert_eq!(
subdag.blocks.len(),
(num_authorities * wave_length) as usize + 1
);
assert_eq!(subdag.commit_ref, commit.reference());
assert_eq!(subdag.reputation_scores_desc, vec![]);
}

#[tokio::test]
Expand Down
37 changes: 14 additions & 23 deletions consensus/core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl CommitObserver {
) -> Self {
let mut observer = Self {
context,
commit_interpreter: Linearizer::new(dag_state.clone()),
commit_interpreter: Linearizer::new(dag_state.clone(), leader_schedule.clone()),
sender: commit_consumer.sender,
store,
leader_schedule,
Expand All @@ -74,17 +74,7 @@ impl CommitObserver {

let committed_sub_dags = self.commit_interpreter.handle_commit(committed_leaders);
let mut sent_sub_dags = vec![];
let reputation_scores_desc = self
.leader_schedule
.leader_swap_table
.read()
.reputation_scores_desc
.clone();
for mut committed_sub_dag in committed_sub_dags.into_iter() {
// TODO: Only update scores after a leader schedule change
// On handle commit the current scores that were used to elect the
// leader of the subdag will be added to the subdag and sent to sui.
committed_sub_dag.update_scores(reputation_scores_desc.clone());
for committed_sub_dag in committed_sub_dags.into_iter() {
// Failures in sender.send() are assumed to be permanent
if let Err(err) = self.sender.send(committed_sub_dag.clone()) {
tracing::error!(
Expand Down Expand Up @@ -134,21 +124,22 @@ impl CommitObserver {
for (index, commit) in unsent_commits.into_iter().enumerate() {
// Commit index must be continuous.
assert_eq!(commit.index(), last_sent_commit_index + 1);
let mut committed_sub_dag =
load_committed_subdag_from_store(self.store.as_ref(), commit);

// On recovery leader schedule will be updated with the current scores
// and the scores will be passed along with the last commit sent to
// sui so that the current scores are available for submission.
if index == num_unsent_commits - 1 {
committed_sub_dag.update_scores(
self.leader_schedule
.leader_swap_table
.read()
.reputation_scores_desc
.clone(),
);
}
let reputation_scores = if index == num_unsent_commits - 1 {
self.leader_schedule
.leader_swap_table
.read()
.reputation_scores_desc
.clone()
} else {
vec![]
};

let committed_sub_dag =
load_committed_subdag_from_store(self.store.as_ref(), commit, reputation_scores);
self.sender.send(committed_sub_dag).unwrap_or_else(|e| {
panic!(
"Failed to send commit during recovery, probably due to shutdown: {:?}",
Expand Down
3 changes: 1 addition & 2 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -484,8 +484,7 @@ impl Core {
tracing::info!(
"Leader schedule change triggered at commit index {last_commit_index}"
);
self.leader_schedule
.update_leader_schedule(self.dag_state.clone());
self.leader_schedule.update_leader_schedule(&self.dag_state);
commits_until_update = self
.leader_schedule
.commits_until_leader_schedule_update(self.dag_state.clone());
Expand Down
7 changes: 5 additions & 2 deletions consensus/core/src/dag_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ impl DagState {
.protocol_config
.mysticeti_leader_scoring_and_schedule()
{
let committed_subdag =
load_committed_subdag_from_store(store.as_ref(), commit.clone());
let committed_subdag = load_committed_subdag_from_store(
store.as_ref(),
commit.clone(),
vec![],
); // We don't need to recover reputation scores for unscored_committed_subdags
unscored_committed_subdags.push(committed_subdag);
}
});
Expand Down
14 changes: 12 additions & 2 deletions consensus/core/src/leader_schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,15 @@ impl LeaderSchedule {
.unwrap() as usize
}

pub(crate) fn update_leader_schedule(&self, dag_state: Arc<RwLock<DagState>>) {
/// Checks whether the dag state unscored sub dags list is empty. If yes then that means that
/// either (1) the system has just started and there is no unscored sub dag available (2) the
/// schedule has updated - new scores have been calculated. Both cases we consider as valid cases
/// where the schedule has been updated.
pub(crate) fn leader_schedule_updated(&self, dag_state: &RwLock<DagState>) -> bool {
dag_state.read().unscored_committed_subdags_count() == 0
}

pub(crate) fn update_leader_schedule(&self, dag_state: &RwLock<DagState>) {
let _s = self
.context
.metrics
Expand Down Expand Up @@ -841,6 +849,7 @@ mod tests {
vec![],
context.clock.timestamp_utc_ms(),
CommitRef::new(1, CommitDigest::MIN),
vec![],
)];
dag_state
.write()
Expand Down Expand Up @@ -938,6 +947,7 @@ mod tests {
blocks,
context.clock.timestamp_utc_ms(),
last_commit.reference(),
vec![],
)];

let mut dag_state_write = dag_state.write();
Expand All @@ -950,7 +960,7 @@ mod tests {
AuthorityIndex::new_for_test(0)
);

leader_schedule.update_leader_schedule(dag_state.clone());
leader_schedule.update_leader_schedule(&dag_state);

let leader_swap_table = leader_schedule.leader_swap_table.read();
assert_eq!(leader_swap_table.good_nodes.len(), 1);
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/leader_scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ mod tests {
blocks,
context.clock.timestamp_utc_ms(),
CommitRef::new(1, CommitDigest::MIN),
vec![],
)];
let scoring_strategy = VoteScoringStrategy {};
let mut calculator =
Expand Down
126 changes: 116 additions & 10 deletions consensus/core/src/linearizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@

use std::{collections::HashSet, sync::Arc};

use consensus_config::AuthorityIndex;
use parking_lot::RwLock;

use crate::commit::sort_sub_dag_blocks;
use crate::leader_schedule::LeaderSchedule;
use crate::{
block::{BlockAPI, VerifiedBlock},
commit::{Commit, CommittedSubDag, TrustedCommit},
Expand All @@ -17,18 +19,26 @@ use crate::{
pub(crate) struct Linearizer {
/// In memory block store representing the dag state
dag_state: Arc<RwLock<DagState>>,
leader_schedule: Arc<LeaderSchedule>,
}

impl Linearizer {
pub(crate) fn new(dag_state: Arc<RwLock<DagState>>) -> Self {
Self { dag_state }
pub(crate) fn new(
dag_state: Arc<RwLock<DagState>>,
leader_schedule: Arc<LeaderSchedule>,
) -> Self {
Self {
dag_state,
leader_schedule,
}
}

/// Collect the sub-dag and the corresponding commit from a specific leader excluding any duplicates or
/// blocks that have already been committed (within previous sub-dags).
fn collect_sub_dag_and_commit(
&mut self,
leader_block: VerifiedBlock,
reputation_scores_desc: Vec<(AuthorityIndex, u64)>,
) -> (CommittedSubDag, TrustedCommit) {
// Grab latest commit state from dag state
let dag_state = self.dag_state.read();
Expand Down Expand Up @@ -100,6 +110,7 @@ impl Linearizer {
to_commit,
timestamp_ms,
commit.reference(),
reputation_scores_desc,
);

(sub_dag, commit)
Expand All @@ -112,10 +123,31 @@ impl Linearizer {
&mut self,
committed_leaders: Vec<VerifiedBlock>,
) -> Vec<CommittedSubDag> {
if committed_leaders.is_empty() {
return vec![];
}

// We check whether the leader schedule has been updated. If yes, then we'll send the scores as
// part of the first sub dag.
let schedule_updated = self
.leader_schedule
.leader_schedule_updated(&self.dag_state);

let mut committed_sub_dags = vec![];
for leader_block in committed_leaders {
for (i, leader_block) in committed_leaders.into_iter().enumerate() {
let reputation_scores_desc = if schedule_updated && i == 0 {
self.leader_schedule
.leader_swap_table
.read()
.reputation_scores_desc
.clone()
} else {
vec![]
};

// Collect the sub-dag generated using each of these leaders and the corresponding commit.
let (sub_dag, commit) = self.collect_sub_dag_and_commit(leader_block);
let (sub_dag, commit) =
self.collect_sub_dag_and_commit(leader_block, reputation_scores_desc);

// Buffer commit in dag state for persistence later.
// This also updates the last committed rounds.
Expand All @@ -129,9 +161,8 @@ impl Linearizer {
// Commit metadata can be persisted more lazily because they are recoverable. Uncommitted
// blocks can wait to persist too.
// But for simplicity, all unpersisted blocks and commits are flushed to storage.
if !committed_sub_dags.is_empty() {
self.dag_state.write().flush();
}
self.dag_state.write().flush();

committed_sub_dags
}
}
Expand All @@ -157,7 +188,11 @@ mod tests {
context.clone(),
Arc::new(MemStore::new()),
)));
let mut linearizer = Linearizer::new(dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::new(
context.clone(),
LeaderSwapTable::default(),
));
let mut linearizer = Linearizer::new(dag_state.clone(), leader_schedule);

// Populate fully connected test blocks for round 0 ~ 10, authorities 0 ~ 3.
let num_rounds: u32 = 10;
Expand Down Expand Up @@ -193,6 +228,74 @@ mod tests {
}
}

#[tokio::test]
async fn test_handle_commit_with_schedule_update() {
telemetry_subscribers::init_for_testing();
let num_authorities = 4;
let context = Arc::new(Context::new_for_test(num_authorities).0);
let dag_state = Arc::new(RwLock::new(DagState::new(
context.clone(),
Arc::new(MemStore::new()),
)));
const NUM_OF_COMMITS_PER_SCHEDULE: u64 = 10;
let leader_schedule = Arc::new(
LeaderSchedule::new(context.clone(), LeaderSwapTable::default())
.with_num_commits_per_schedule(NUM_OF_COMMITS_PER_SCHEDULE),
);
let mut linearizer = Linearizer::new(dag_state.clone(), leader_schedule.clone());

// Populate fully connected test blocks for round 0 ~ 20, authorities 0 ~ 3.
let num_rounds: u32 = 20;
let mut dag_builder = DagBuilder::new(context.clone());
dag_builder
.layers(1..=num_rounds)
.build()
.persist_layers(dag_state.clone());

// Take the first 10 leaders
let leaders = dag_builder
.leader_blocks(1..=10)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();

// Create some commits
let commits = linearizer.handle_commit(leaders.clone());

// Write them in DagState
dag_state.write().add_unscored_committed_subdags(commits);

// Now update the leader schedule
leader_schedule.update_leader_schedule(&dag_state);

assert!(
leader_schedule.leader_schedule_updated(&dag_state),
"Leader schedule should have been updated"
);

// Try to commit now the rest of the 10 leaders
let leaders = dag_builder
.leader_blocks(11..=20)
.into_iter()
.map(Option::unwrap)
.collect::<Vec<_>>();

// Now on the commits only the first one should contain the updated scores, the other should be empty
let commits = linearizer.handle_commit(leaders.clone());
assert_eq!(commits.len(), 10);
let scores = vec![
(AuthorityIndex::new_for_test(2), 9),
(AuthorityIndex::new_for_test(1), 8),
(AuthorityIndex::new_for_test(0), 8),
(AuthorityIndex::new_for_test(3), 8),
];
assert_eq!(commits[0].reputation_scores_desc, scores);

for commit in commits.into_iter().skip(1) {
assert_eq!(commit.reputation_scores_desc, vec![]);
}
}

#[tokio::test]
async fn test_handle_already_committed() {
telemetry_subscribers::init_for_testing();
Expand All @@ -202,8 +305,11 @@ mod tests {
context.clone(),
Arc::new(MemStore::new()),
)));
let leader_schedule = LeaderSchedule::new(context.clone(), LeaderSwapTable::default());
let mut linearizer = Linearizer::new(dag_state.clone());
let leader_schedule = Arc::new(LeaderSchedule::new(
context.clone(),
LeaderSwapTable::default(),
));
let mut linearizer = Linearizer::new(dag_state.clone(), leader_schedule.clone());
let wave_length = DEFAULT_WAVE_LENGTH;

let leader_round_wave_1 = 3;
Expand Down
1 change: 1 addition & 0 deletions consensus/core/src/test_dag_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl DagBuilder {
to_commit,
timestamp_ms,
commit.reference(),
vec![],
);

(sub_dag, commit)
Expand Down

0 comments on commit 71c997e

Please sign in to comment.