From 34416bfce4f949c7ba07e08a47535f15f59dbf4a Mon Sep 17 00:00:00 2001 From: Anastasios Kichidis Date: Tue, 15 Oct 2024 13:26:52 +0100 Subject: [PATCH] [Consensus] DagState to evict blocks based on GC round (#19465) ## Description Currently we evict the cached ref entries in DagState whenever we `flush`. At this point we evict the entries for each authority by dropping all the blocks which are `<= evict_round`, where `evict_round = authority_latest_commit_round - CACHED_ROUNDS` . The `CACHED_ROUNDS` here allow us to keep around for a little longer committed blocks. Of course all the blocks that are `> evict_round` are kept. This can work fine so far where we don't use GC , as we expect eventually to include blocks from other peers as weak links - no matter how far back they are - and that will move the `authority_latest_commit_round` and trigger the eviction of their blocks from our memory. Now with GC we don't have those guarantees. It is possible to get to a scenario where even a group of slow nodes that are constantly behind `gc_round`, they keep proposing but their blocks never get committed. Although their blocks should not end up in others DAGs , they will remain in their own and fill up their memory. Overall, the current approach will provide weaker guarantees. This PR is changing the eviction strategy so it's driven by the `gc_round`. Doing though the eviction purely on the `gc_round` will change a lot the semantics of the `DagState` as one of the intentions was to keep recent cached data from each authority. That would also be particularly visible for authorities for which we do not have frequent block proposals, as we could end up always evicting all their blocks if they are behind the `gc_round`. Then this would not allow us to do certain operations we used to do before with cached data(ex get latest cached block per authority). For that reason this PR is changing a bit the semantics of the `CACHED_ROUNDS` and from now on it will be the minimum/desired amount of rounds we want to keep in cache for each of authority. The eviction algorithm will still attempt to clean up records that are `<= gc_round`, but it will also make sure that `CACHED_ROUNDS` worth of data are kept around. Especially for more edge case situation where a node has not produced blocks `> gc_round`, we guarantee to keep `CACHED_ROUNDS` even when all of them are `<= gc_round`, but we'll eventually evict anything before - practically like a moving window. ## Test plan CI/PT --- ## Release notes Check each box that your changes affect. If none of the boxes relate to your changes, release notes aren't required. For each box you select, include information after the relevant heading that describes the impact of your changes that a user might notice and any actions they must take to implement updates. - [ ] Protocol: - [ ] Nodes (Validators and Full nodes): - [ ] Indexer: - [ ] JSON-RPC: - [ ] GraphQL: - [ ] CLI: - [ ] Rust SDK: - [ ] REST API: --- consensus/core/src/base_committer.rs | 26 +- consensus/core/src/dag_state.rs | 509 +++++++++++++++++++++---- consensus/core/src/leader_schedule.rs | 85 +---- consensus/core/src/leader_scoring.rs | 90 +---- consensus/core/src/linearizer.rs | 109 ++++-- consensus/core/src/test_dag_builder.rs | 157 +++++--- 6 files changed, 643 insertions(+), 333 deletions(-) diff --git a/consensus/core/src/base_committer.rs b/consensus/core/src/base_committer.rs index 41a0b5cbd6616..7c390f9d47f2a 100644 --- a/consensus/core/src/base_committer.rs +++ b/consensus/core/src/base_committer.rs @@ -227,17 +227,31 @@ impl BaseCommitter { leader_block: &VerifiedBlock, all_votes: &mut HashMap, ) -> bool { + let (gc_enabled, gc_round) = { + let dag_state = self.dag_state.read(); + (dag_state.gc_enabled(), dag_state.gc_round()) + }; + let mut votes_stake_aggregator = StakeAggregator::::new(); for reference in potential_certificate.ancestors() { let is_vote = if let Some(is_vote) = all_votes.get(reference) { *is_vote } else { - let potential_vote = self - .dag_state - .read() - .get_block(reference) - .unwrap_or_else(|| panic!("Block not found in storage: {:?}", reference)); - let is_vote = self.is_vote(&potential_vote, leader_block); + let potential_vote = self.dag_state.read().get_block(reference); + + let is_vote = if gc_enabled { + if let Some(potential_vote) = potential_vote { + self.is_vote(&potential_vote, leader_block) + } else { + assert!(reference.round <= gc_round, "Block not found in storage: {:?} , and is not below gc_round: {gc_round}", reference); + false + } + } else { + let potential_vote = potential_vote + .unwrap_or_else(|| panic!("Block not found in storage: {:?}", reference)); + self.is_vote(&potential_vote, leader_block) + }; + all_votes.insert(*reference, is_vote); is_vote }; diff --git a/consensus/core/src/dag_state.rs b/consensus/core/src/dag_state.rs index cfe7ba080dbcb..83aada9cb05a7 100644 --- a/consensus/core/src/dag_state.rs +++ b/consensus/core/src/dag_state.rs @@ -7,11 +7,12 @@ use std::{ ops::Bound::{Excluded, Included, Unbounded}, panic, sync::Arc, + vec, }; use consensus_config::AuthorityIndex; use itertools::Itertools as _; -use tracing::{debug, error}; +use tracing::{debug, error, info}; use crate::{ block::{ @@ -44,12 +45,22 @@ pub(crate) struct DagState { // Contains recent blocks within CACHED_ROUNDS from the last committed round per authority. // Note: all uncommitted blocks are kept in memory. + // + // When GC is enabled, this map has a different semantic. It holds all the recent data for each authority making sure that it always have available + // CACHED_ROUNDS worth of data. The entries are evicted based on the latest GC round, however the eviction process will respect the CACHED_ROUNDS. + // For each authority, blocks are only evicted when their round is less than or equal to both `gc_round`, and `highest authority round - cached rounds`. + // This ensures that the GC requirements are respected (we never clean up any block above `gc_round`), and there are enough blocks cached. recent_blocks: BTreeMap, // Indexes recent block refs by their authorities. // Vec position corresponds to the authority index. recent_refs_by_authority: Vec>, + // Keeps track of the highest round that has been evicted for each authority. Any blocks that are of round <= evict_round + // should be considered evicted, and if any exist we should not consider the causauly complete in the order they appear. + // The `evicted_rounds` size should be the same as the committee size. + evicted_rounds: Vec, + // Highest round of blocks accepted. highest_accepted_round: Round, @@ -169,20 +180,57 @@ impl DagState { unscored_committed_subdags, store, cached_rounds, + evicted_rounds: vec![0; num_authorities], }; for (i, round) in last_committed_rounds.into_iter().enumerate() { let authority_index = state.context.committee.to_authority_index(i).unwrap(); - let blocks = state - .store - .scan_blocks_by_author( - authority_index, - Self::eviction_round(round, cached_rounds) + 1, - ) - .unwrap(); - for block in blocks { - state.update_block_metadata(&block); + let (blocks, eviction_round) = if state.gc_enabled() { + // Find the latest block for the authority to calculate the eviction round. Then we want to scan and load the blocks from the eviction round and onwards only. + // As reminder, the eviction round is taking into account the gc_round. + let last_block = state + .store + .scan_last_blocks_by_author(authority_index, 1, None) + .expect("Database error"); + let last_block_round = last_block + .last() + .map(|b| b.round()) + .unwrap_or(GENESIS_ROUND); + + let eviction_round = Self::gc_eviction_round( + last_block_round, + state.gc_round(), + state.cached_rounds, + ); + let blocks = state + .store + .scan_blocks_by_author(authority_index, eviction_round + 1) + .expect("Database error"); + (blocks, eviction_round) + } else { + let eviction_round = Self::eviction_round(round, cached_rounds); + let blocks = state + .store + .scan_blocks_by_author(authority_index, eviction_round + 1) + .expect("Database error"); + (blocks, eviction_round) + }; + + state.evicted_rounds[authority_index] = eviction_round; + + // Update the block metadata for the authority. + for block in &blocks { + state.update_block_metadata(block); } + + info!( + "Recovered blocks {}: {:?}", + authority_index, + blocks + .iter() + .map(|b| b.reference()) + .collect::>() + ); } state @@ -477,7 +525,7 @@ impl DagState { .to_authority_index(authority_index) .unwrap(); - let last_evicted_round = self.authority_eviction_round(authority_index); + let last_evicted_round = self.evicted_rounds[authority_index]; if end_round.saturating_sub(1) <= last_evicted_round { panic!("Attempted to request for blocks of rounds < {end_round}, when the last evicted round is {last_evicted_round} for authority {authority_index}", ); } @@ -513,11 +561,9 @@ impl DagState { return true; } - if slot.round <= self.authority_eviction_round(slot.authority) { - panic!( - "Attempted to check for slot {slot} that is <= the last evicted round {}", - self.authority_eviction_round(slot.authority) - ); + let eviction_round = self.evicted_rounds[slot.authority]; + if slot.round <= eviction_round { + panic!("{}", format!("Attempted to check for slot {slot} that is <= the last{}evicted round {eviction_round}", if self.gc_enabled() { " gc " } else { " " } )); } let mut result = self.recent_refs_by_authority[slot.authority].range(( @@ -780,30 +826,29 @@ impl DagState { .inc(); // Clean up old cached data. After flushing, all cached blocks are guaranteed to be persisted. - let mut total_recent_refs = 0; - for (authority_refs, last_committed_round) in self - .recent_refs_by_authority - .iter_mut() - .zip(self.last_committed_rounds.iter()) - { - while let Some(block_ref) = authority_refs.first() { - if block_ref.round - <= Self::eviction_round(*last_committed_round, self.cached_rounds) - { + for (authority_index, _) in self.context.committee.authorities() { + let eviction_round = self.calculate_authority_eviction_round(authority_index); + while let Some(block_ref) = self.recent_refs_by_authority[authority_index].first() { + if block_ref.round <= eviction_round { self.recent_blocks.remove(block_ref); - authority_refs.pop_first(); + self.recent_refs_by_authority[authority_index].pop_first(); } else { break; } } - total_recent_refs += authority_refs.len(); + self.evicted_rounds[authority_index] = eviction_round; } let metrics = &self.context.metrics.node_metrics; metrics .dag_state_recent_blocks .set(self.recent_blocks.len() as i64); - metrics.dag_state_recent_refs.set(total_recent_refs as i64); + metrics.dag_state_recent_refs.set( + self.recent_refs_by_authority + .iter() + .map(BTreeSet::len) + .sum::() as i64, + ); } /// Detects and returns the blocks of the round that forms the last quorum. The method will return @@ -890,12 +935,21 @@ impl DagState { self.genesis.values().cloned().collect() } - /// The last round that got evicted after a cache clean up operation. After this round we are + /// The last round that should get evicted after a cache clean up operation. After this round we are /// guaranteed to have all the produced blocks from that authority. For any round that is /// <= `last_evicted_round` we don't have such guarantees as out of order blocks might exist. - fn authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round { - let commit_round = self.last_committed_rounds[authority_index]; - Self::eviction_round(commit_round, self.cached_rounds) + fn calculate_authority_eviction_round(&self, authority_index: AuthorityIndex) -> Round { + if self.gc_enabled() { + let last_round = self.recent_refs_by_authority[authority_index] + .last() + .map(|block_ref| block_ref.round) + .unwrap_or(GENESIS_ROUND); + + Self::gc_eviction_round(last_round, self.gc_round(), self.cached_rounds) + } else { + let commit_round = self.last_committed_rounds[authority_index]; + Self::eviction_round(commit_round, self.cached_rounds) + } } /// Calculates the last eviction round based on the provided `commit_round`. Any blocks with @@ -904,6 +958,12 @@ impl DagState { commit_round.saturating_sub(cached_rounds) } + /// Calculates the eviction round for the given authority. The goal is to keep at least `cached_rounds` + /// of the latest blocks in the cache (if enough data is available), while evicting blocks with rounds <= `gc_round` when possible. + fn gc_eviction_round(last_round: Round, gc_round: Round, cached_rounds: u32) -> Round { + gc_round.min(last_round.saturating_sub(cached_rounds)) + } + #[cfg(test)] pub(crate) fn set_last_commit(&mut self, commit: TrustedCommit) { self.last_commit = Some(commit); @@ -915,12 +975,14 @@ mod test { use std::vec; use parking_lot::RwLock; + use rstest::rstest; use super::*; use crate::{ block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock}, storage::{mem_store::MemStore, WriteBatch}, test_dag_builder::DagBuilder, + test_dag_parser::parse_dag, }; #[tokio::test] @@ -1350,6 +1412,79 @@ mod test { dag_state.contains_cached_block_at_slot(Slot::new(8, AuthorityIndex::new_for_test(0))); } + #[tokio::test] + #[should_panic( + expected = "Attempted to check for slot B3 that is <= the last gc evicted round 3" + )] + async fn test_contains_cached_block_at_slot_panics_when_ask_out_of_range_gc_enabled() { + /// Keep 2 rounds from the highest committed round. This is considered universal and minimum necessary blocks to hold + /// for the correct node operation. + const GC_DEPTH: u32 = 2; + /// Keep at least 3 rounds in cache for each authority. + const CACHED_ROUNDS: Round = 3; + + let (mut context, _) = Context::new_for_test(4); + context + .protocol_config + .set_consensus_gc_depth_for_testing(GC_DEPTH); + context.parameters.dag_state_cached_rounds = CACHED_ROUNDS; + + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + let mut dag_state = DagState::new(context.clone(), store.clone()); + + // Create for rounds 1..=6. Skip creating blocks for authority 0 for rounds 4 - 6. + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=3).build(); + dag_builder + .layers(4..=6) + .authorities(vec![AuthorityIndex::new_for_test(0)]) + .skip_block() + .build(); + + // Accept all blocks + dag_builder + .all_blocks() + .into_iter() + .for_each(|block| dag_state.accept_block(block)); + + // Now add a commit for leader round 5 to trigger an eviction + dag_state.add_commit(TrustedCommit::new_for_test( + 1 as CommitIndex, + CommitDigest::MIN, + 0, + dag_builder.leader_block(5).unwrap().reference(), + vec![], + )); + + dag_state.flush(); + + // Ensure that gc round has been updated + assert_eq!(dag_state.gc_round(), 3, "GC round should be 3"); + + // Now what we expect to happen is for: + // * Nodes 1 - 3 should have in cache blocks from gc_round (3) and onwards. + // * Node 0 should have in cache blocks from it's latest round, 3, up to round 1, which is the number of cached_rounds. + for authority_index in 1..=3 { + for round in 4..=6 { + assert!(dag_state.contains_cached_block_at_slot(Slot::new( + round, + AuthorityIndex::new_for_test(authority_index) + ))); + } + } + + for round in 1..=3 { + assert!(dag_state + .contains_cached_block_at_slot(Slot::new(round, AuthorityIndex::new_for_test(0)))); + } + + // When trying to request for authority 1 at block slot 3 it should panic, as anything + // that is <= 3 should be evicted + let _ = + dag_state.contains_cached_block_at_slot(Slot::new(3, AuthorityIndex::new_for_test(1))); + } + #[tokio::test] async fn test_get_blocks_in_cache_or_store() { let (context, _) = Context::new_for_test(4); @@ -1409,14 +1544,22 @@ mod test { } // TODO: Remove when DistributedVoteScoring is enabled. + #[rstest] #[tokio::test] - async fn test_flush_and_recovery_with_unscored_subdag() { + async fn test_flush_and_recovery_with_unscored_subdag(#[values(0, 5)] gc_depth: u32) { telemetry_subscribers::init_for_testing(); let num_authorities: u32 = 4; let (mut context, _) = Context::new_for_test(num_authorities as usize); context .protocol_config .set_consensus_distributed_vote_scoring_strategy_for_testing(false); + + if gc_depth > 0 { + context + .protocol_config + .set_consensus_gc_depth_for_testing(gc_depth); + } + let context = Arc::new(context); let store = Arc::new(MemStore::new()); let mut dag_state = DagState::new(context.clone(), store.clone()); @@ -1426,24 +1569,8 @@ mod test { let mut dag_builder = DagBuilder::new(context.clone()); dag_builder.layers(1..=num_rounds).build(); let mut commits = vec![]; - let leaders = dag_builder - .leader_blocks(1..=num_rounds) - .into_iter() - .flatten() - .collect::>(); - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, commit) = dag_builder.get_sub_dag_and_commit( - leader.clone(), - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); - } + for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) { commits.push(commit); } @@ -1478,7 +1605,10 @@ mod test { // Last commit index should be 10. assert_eq!(dag_state.last_commit_index(), 10); - assert_eq!(dag_state.last_committed_rounds(), last_committed_rounds); + assert_eq!( + dag_state.last_committed_rounds(), + dag_builder.last_committed_rounds.clone() + ); // Destroy the dag state. drop(dag_state); @@ -1539,24 +1669,7 @@ mod test { let mut dag_builder = DagBuilder::new(context.clone()); dag_builder.layers(1..=num_rounds).build(); let mut commits = vec![]; - let leaders = dag_builder - .leader_blocks(1..=num_rounds) - .into_iter() - .flatten() - .collect::>(); - - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, commit) = dag_builder.get_sub_dag_and_commit( - leader.clone(), - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); - } + for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) { commits.push(commit); } @@ -1591,7 +1704,10 @@ mod test { // Last commit index should be 10. assert_eq!(dag_state.last_commit_index(), 10); - assert_eq!(dag_state.last_committed_rounds(), last_committed_rounds); + assert_eq!( + dag_state.last_committed_rounds(), + dag_builder.last_committed_rounds.clone() + ); // Destroy the dag state. drop(dag_state); @@ -1638,6 +1754,144 @@ mod test { assert_eq!(dag_state.scoring_subdags_count(), 5); } + #[tokio::test] + async fn test_flush_and_recovery_gc_enabled() { + telemetry_subscribers::init_for_testing(); + + const GC_DEPTH: u32 = 3; + const CACHED_ROUNDS: u32 = 4; + + let num_authorities: u32 = 4; + let (mut context, _) = Context::new_for_test(num_authorities as usize); + context.parameters.dag_state_cached_rounds = CACHED_ROUNDS; + context + .protocol_config + .set_consensus_gc_depth_for_testing(GC_DEPTH); + + let context = Arc::new(context); + + let store = Arc::new(MemStore::new()); + let mut dag_state = DagState::new(context.clone(), store.clone()); + + let num_rounds: u32 = 10; + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder.layers(1..=5).build(); + dag_builder + .layers(6..=8) + .authorities(vec![AuthorityIndex::new_for_test(0)]) + .skip_block() + .build(); + dag_builder.layers(9..=num_rounds).build(); + + let mut commits = vec![]; + for (_subdag, commit) in dag_builder.get_sub_dag_and_commits(1..=num_rounds) { + commits.push(commit); + } + + // Add the blocks from first 8 rounds and first 7 commits to the dag state + // It's 7 commits because we missing the commit of round 8 where authority 0 is the leader, but produced no block + let temp_commits = commits.split_off(7); + dag_state.accept_blocks(dag_builder.blocks(1..=8)); + for commit in commits.clone() { + dag_state.add_commit(commit); + } + + // Flush the dag state + dag_state.flush(); + + // Add the rest of the blocks and commits to the dag state + dag_state.accept_blocks(dag_builder.blocks(9..=num_rounds)); + for commit in temp_commits.clone() { + dag_state.add_commit(commit); + } + + // All blocks should be found in DagState. + let all_blocks = dag_builder.blocks(1..=num_rounds); + let block_refs = all_blocks + .iter() + .map(|block| block.reference()) + .collect::>(); + let result = dag_state + .get_blocks(&block_refs) + .into_iter() + .map(|b| b.unwrap()) + .collect::>(); + assert_eq!(result, all_blocks); + + // Last commit index should be 9 + assert_eq!(dag_state.last_commit_index(), 9); + assert_eq!( + dag_state.last_committed_rounds(), + dag_builder.last_committed_rounds.clone() + ); + + // Destroy the dag state. + drop(dag_state); + + // Recover the state from the store + let dag_state = DagState::new(context.clone(), store.clone()); + + // Blocks of first 5 rounds should be found in DagState. + let blocks = dag_builder.blocks(1..=5); + let block_refs = blocks + .iter() + .map(|block| block.reference()) + .collect::>(); + let result = dag_state + .get_blocks(&block_refs) + .into_iter() + .map(|b| b.unwrap()) + .collect::>(); + assert_eq!(result, blocks); + + // Blocks above round 9 should not be in DagState, because they are not flushed. + let missing_blocks = dag_builder.blocks(9..=num_rounds); + let block_refs = missing_blocks + .iter() + .map(|block| block.reference()) + .collect::>(); + let retrieved_blocks = dag_state + .get_blocks(&block_refs) + .into_iter() + .flatten() + .collect::>(); + assert!(retrieved_blocks.is_empty()); + + // Last commit index should be 7. + assert_eq!(dag_state.last_commit_index(), 7); + + // This is the last_commmit_rounds of the first 7 commits that were flushed + let expected_last_committed_rounds = vec![5, 6, 6, 7]; + assert_eq!( + dag_state.last_committed_rounds(), + expected_last_committed_rounds + ); + // Unscored subdags will be recoverd based on the flushed commits and no commit info + assert_eq!(dag_state.scoring_subdags_count(), 7); + + // Ensure that cached blocks exist only for specific rounds per authority + for (authority_index, _) in context.committee.authorities() { + let blocks = dag_state.get_cached_blocks(authority_index, 1); + + // Ensure that eviction rounds have been properly recovered + // DagState should hold cached blocks for authority 0 for rounds [2..=5] as no higher blocks exist and due to CACHED_ROUNDS = 4 + // we want at max to hold blocks for 4 rounds in cache. + if authority_index == AuthorityIndex::new_for_test(0) { + assert_eq!(blocks.len(), 4); + assert_eq!(dag_state.evicted_rounds[authority_index.value()], 1); + assert!(blocks + .into_iter() + .all(|block| block.round() >= 2 && block.round() <= 5)); + } else { + assert_eq!(blocks.len(), 4); + assert_eq!(dag_state.evicted_rounds[authority_index.value()], 4); + assert!(blocks + .into_iter() + .all(|block| block.round() >= 5 && block.round() <= 8)); + } + } + } + #[tokio::test] async fn test_get_cached_blocks() { let (mut context, _) = Context::new_for_test(4); @@ -1693,13 +1947,20 @@ mod test { assert_eq!(cached_blocks[0].round(), 12); } + #[rstest] #[tokio::test] - async fn test_get_cached_last_block_per_authority() { + async fn test_get_cached_last_block_per_authority(#[values(0, 1)] gc_depth: u32) { // GIVEN const CACHED_ROUNDS: Round = 2; let (mut context, _) = Context::new_for_test(4); context.parameters.dag_state_cached_rounds = CACHED_ROUNDS; + if gc_depth > 0 { + context + .protocol_config + .set_consensus_gc_depth_for_testing(gc_depth); + } + let context = Arc::new(context); let store = Arc::new(MemStore::new()); let mut dag_state = DagState::new(context.clone(), store.clone()); @@ -1708,24 +1969,35 @@ mod test { // Create one block (round 1) for authority 1 // Create two blocks (rounds 1,2) for authority 2 // Create three blocks (rounds 1,2,3) for authority 3 - let mut all_blocks = Vec::new(); - for author in 1..=3 { - for round in 1..=author { - let block = VerifiedBlock::new_for_test(TestBlock::new(round, author).build()); - all_blocks.push(block.clone()); - dag_state.accept_block(block); - } + let dag_str = "DAG { + Round 0 : { 4 }, + Round 1 : { + B -> [*], + C -> [*], + D -> [*], + }, + Round 2 : { + C -> [*], + D -> [*], + }, + Round 3 : { + D -> [*], + }, + }"; + + let (_, dag_builder) = parse_dag(dag_str).expect("Invalid dag"); + + // Accept all blocks + for block in dag_builder.all_blocks() { + dag_state.accept_block(block); } dag_state.add_commit(TrustedCommit::new_for_test( 1 as CommitIndex, CommitDigest::MIN, context.clock.timestamp_utc_ms(), - all_blocks.last().unwrap().reference(), - all_blocks - .into_iter() - .map(|block| block.reference()) - .collect::>(), + dag_builder.leader_block(3).unwrap().reference(), + vec![], )); // WHEN search for the latest blocks @@ -1740,6 +2012,9 @@ mod test { // WHEN we flush the DagState - after adding a commit with all the blocks, we expect this to trigger // a clean up in the internal cache. That will keep the all the blocks with rounds >= authority_commit_round - CACHED_ROUND. + // + // When GC is enabled then we'll keep all the blocks that are > gc_round (2) and for those who don't have blocks > gc_round, we'll keep + // all their highest round blocks for CACHED_ROUNDS. dag_state.flush(); // AND we request before round 3 @@ -1800,6 +2075,72 @@ mod test { dag_state.get_last_cached_block_per_authority(end_round); } + #[tokio::test] + #[should_panic( + expected = "Attempted to request for blocks of rounds < 2, when the last evicted round is 1 for authority C" + )] + async fn test_get_cached_last_block_per_authority_requesting_out_of_round_range_gc_enabled() { + // GIVEN + const CACHED_ROUNDS: Round = 1; + const GC_DEPTH: u32 = 1; + let (mut context, _) = Context::new_for_test(4); + context.parameters.dag_state_cached_rounds = CACHED_ROUNDS; + context + .protocol_config + .set_consensus_gc_depth_for_testing(GC_DEPTH); + + let context = Arc::new(context); + let store = Arc::new(MemStore::new()); + let mut dag_state = DagState::new(context.clone(), store.clone()); + + // Create no blocks for authority 0 + // Create one block (round 1) for authority 1 + // Create two blocks (rounds 1,2) for authority 2 + // Create three blocks (rounds 1,2,3) for authority 3 + let mut dag_builder = DagBuilder::new(context.clone()); + dag_builder + .layers(1..=1) + .authorities(vec![AuthorityIndex::new_for_test(0)]) + .skip_block() + .build(); + dag_builder + .layers(2..=2) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(1), + ]) + .skip_block() + .build(); + dag_builder + .layers(3..=3) + .authorities(vec![ + AuthorityIndex::new_for_test(0), + AuthorityIndex::new_for_test(1), + AuthorityIndex::new_for_test(2), + ]) + .skip_block() + .build(); + + // Accept all blocks + for block in dag_builder.all_blocks() { + dag_state.accept_block(block); + } + + dag_state.add_commit(TrustedCommit::new_for_test( + 1 as CommitIndex, + CommitDigest::MIN, + 0, + dag_builder.leader_block(3).unwrap().reference(), + vec![], + )); + + // Flush the store so we update the evict rounds + dag_state.flush(); + + // THEN the method should panic, as some authorities have already evicted rounds <= round 2 + dag_state.get_last_cached_block_per_authority(2); + } + #[tokio::test] async fn test_last_quorum() { // GIVEN diff --git a/consensus/core/src/leader_schedule.rs b/consensus/core/src/leader_schedule.rs index 56c9813a0d836..21995bc977c1b 100644 --- a/consensus/core/src/leader_schedule.rs +++ b/consensus/core/src/leader_schedule.rs @@ -508,11 +508,10 @@ impl Debug for LeaderSwapTable { #[cfg(test)] mod tests { - use std::cmp::max; use super::*; use crate::{ - block::{BlockAPI as _, BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock}, + block::{BlockDigest, BlockRef, BlockTimestampMs, TestBlock, VerifiedBlock}, commit::{CommitDigest, CommitInfo, CommitRef, CommittedSubDag, TrustedCommit}, storage::{mem_store::MemStore, Store, WriteBatch}, test_dag_builder::DagBuilder, @@ -581,27 +580,12 @@ mod tests { dag_builder.layers(1..=11).build(); let mut subdags = vec![]; let mut expected_commits = vec![]; - let leaders = dag_builder - .leader_blocks(1..=11) - .into_iter() - .flatten() - .collect::>(); let mut blocks_to_write = vec![]; - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (sub_dag, commit) = dag_builder.get_sub_dag_and_commit( - leader.clone(), - last_committed_rounds.clone(), - commit_index, - ); + for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) { for block in sub_dag.blocks.iter() { blocks_to_write.push(block.clone()); - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); } - expected_commits.push(commit); subdags.push(sub_dag); } @@ -633,7 +617,7 @@ mod tests { // Check that DagState recovery from stored CommitInfo worked correctly assert_eq!( - last_committed_rounds, + dag_builder.last_committed_rounds.clone(), dag_state.read().last_committed_rounds() ); assert_eq!(1, dag_state.read().scoring_subdags_count()); @@ -705,28 +689,14 @@ mod tests { let mut expected_scored_subdags = vec![]; let mut expected_commits = vec![]; - let leaders = dag_builder - .leader_blocks(1..=2) - .into_iter() - .flatten() - .collect::>(); let mut blocks_to_write = vec![]; - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, commit) = dag_builder.get_sub_dag_and_commit( - leader.clone(), - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { + for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) { + for block in sub_dag.blocks.iter() { blocks_to_write.push(block.clone()); - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); } expected_commits.push(commit); - expected_scored_subdags.push(subdag); + expected_scored_subdags.push(sub_dag); } // The CommitInfo for the first 2 commits are written to store. 10 commits @@ -746,7 +716,7 @@ mod tests { // Check that DagState recovery from stored CommitInfo worked correctly assert_eq!( - last_committed_rounds, + dag_builder.last_committed_rounds.clone(), dag_state.read().last_committed_rounds() ); assert_eq!( @@ -1106,27 +1076,12 @@ mod tests { dag_builder.layers(1..=11).build(); let mut subdags = vec![]; let mut expected_commits = vec![]; - let leaders = dag_builder - .leader_blocks(1..=11) - .into_iter() - .flatten() - .collect::>(); let mut blocks_to_write = vec![]; - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (sub_dag, commit) = dag_builder.get_sub_dag_and_commit( - leader.clone(), - last_committed_rounds.clone(), - commit_index, - ); + for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=11) { for block in sub_dag.blocks.iter() { blocks_to_write.push(block.clone()); - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); } - expected_commits.push(commit); subdags.push(sub_dag); } @@ -1158,7 +1113,7 @@ mod tests { // Check that DagState recovery from stored CommitInfo worked correctly assert_eq!( - last_committed_rounds, + dag_builder.last_committed_rounds.clone(), dag_state.read().last_committed_rounds() ); let actual_unscored_subdags = dag_state.read().unscored_committed_subdags(); @@ -1236,28 +1191,14 @@ mod tests { let mut expected_unscored_subdags = vec![]; let mut expected_commits = vec![]; - let leaders = dag_builder - .leader_blocks(1..=2) - .into_iter() - .flatten() - .collect::>(); let mut blocks_to_write = vec![]; - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, commit) = dag_builder.get_sub_dag_and_commit( - leader.clone(), - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { + for (sub_dag, commit) in dag_builder.get_sub_dag_and_commits(1..=2) { + for block in sub_dag.blocks.iter() { blocks_to_write.push(block.clone()); - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); } expected_commits.push(commit); - expected_unscored_subdags.push(subdag); + expected_unscored_subdags.push(sub_dag); } // The CommitInfo for the first 2 commits are written to store. 10 commits @@ -1277,7 +1218,7 @@ mod tests { // Check that DagState recovery from stored CommitInfo worked correctly assert_eq!( - last_committed_rounds, + dag_builder.last_committed_rounds.clone(), dag_state.read().last_committed_rounds() ); let actual_unscored_subdags = dag_state.read().unscored_committed_subdags(); diff --git a/consensus/core/src/leader_scoring.rs b/consensus/core/src/leader_scoring.rs index 89803a4a273f3..7136daf7c5e30 100644 --- a/consensus/core/src/leader_scoring.rs +++ b/consensus/core/src/leader_scoring.rs @@ -475,8 +475,6 @@ impl UnscoredSubdag { #[cfg(test)] mod tests { - use std::cmp::max; - use super::*; use crate::{test_dag_builder::DagBuilder, CommitDigest, CommitRef}; @@ -551,26 +549,10 @@ mod tests { .skip_block() .build(); - let leaders = dag_builder - .leader_blocks(1..=4) - .into_iter() - .flatten() - .collect::>(); - let mut scoring_subdag = ScoringSubdag::new(context.clone()); - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, _commit) = dag_builder.get_sub_dag_and_commit( - leader, - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); - } - scoring_subdag.add_subdags(vec![subdag]); + + for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) { + scoring_subdag.add_subdags(vec![sub_dag]); } let scores = scoring_subdag.calculate_scores(); @@ -597,26 +579,10 @@ mod tests { .skip_block() .build(); - let leaders = dag_builder - .leader_blocks(1..=4) - .into_iter() - .flatten() - .collect::>(); - let mut scoring_subdag = ScoringSubdag::new(context.clone()); - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, _commit) = dag_builder.get_sub_dag_and_commit( - leader, - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); - } - scoring_subdag.add_subdags(vec![subdag]); + + for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) { + scoring_subdag.add_subdags(vec![sub_dag]); } let scores_per_authority = scoring_subdag.score_certified_votes(); @@ -644,27 +610,11 @@ mod tests { .skip_block() .build(); - let leaders = dag_builder - .leader_blocks(1..=4) - .into_iter() - .flatten() - .collect::>(); - let mut unscored_subdags = vec![]; - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, _commit) = dag_builder.get_sub_dag_and_commit( - leader, - last_committed_rounds.clone(), - commit_index, - ); - for block in subdag.blocks.iter() { - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); - } - unscored_subdags.push(subdag); + for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) { + unscored_subdags.push(sub_dag); } + let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags); let scores = calculator.calculate(); assert_eq!(scores.scores_per_authority, vec![3, 2, 2, 2]); @@ -727,27 +677,9 @@ mod tests { .skip_block() .build(); - let leaders = dag_builder - .leader_blocks(1..=4) - .into_iter() - .flatten() - .collect::>(); - let mut unscored_subdags = vec![]; - let mut last_committed_rounds = vec![0; 4]; - for (idx, leader) in leaders.into_iter().enumerate() { - let commit_index = idx as u32 + 1; - let (subdag, _commit) = dag_builder.get_sub_dag_and_commit( - leader, - last_committed_rounds.clone(), - commit_index, - ); - tracing::info!("{subdag:?}"); - for block in subdag.blocks.iter() { - last_committed_rounds[block.author().value()] = - max(block.round(), last_committed_rounds[block.author().value()]); - } - unscored_subdags.push(subdag); + for (sub_dag, _commit) in dag_builder.get_sub_dag_and_commits(1..=4) { + unscored_subdags.push(sub_dag); } let mut calculator = ReputationScoreCalculator::new(context.clone(), &unscored_subdags); diff --git a/consensus/core/src/linearizer.rs b/consensus/core/src/linearizer.rs index f362d7ef2f7fc..dba030d05271e 100644 --- a/consensus/core/src/linearizer.rs +++ b/consensus/core/src/linearizer.rs @@ -7,13 +7,39 @@ use consensus_config::AuthorityIndex; use parking_lot::RwLock; use crate::{ - block::{BlockAPI, VerifiedBlock}, + block::{BlockAPI, BlockRef, VerifiedBlock}, commit::{sort_sub_dag_blocks, Commit, CommittedSubDag, TrustedCommit}, dag_state::DagState, leader_schedule::LeaderSchedule, - Round, + Round, TransactionIndex, }; +/// The `StorageAPI` trait provides an interface for the block store and has been +/// mostly introduced for allowing to inject the test store in `DagBuilder`. +pub(crate) trait BlockStoreAPI { + fn get_blocks(&self, refs: &[BlockRef]) -> Vec>; + + fn gc_round(&self) -> Round; + + fn gc_enabled(&self) -> bool; +} + +impl BlockStoreAPI + for parking_lot::lock_api::RwLockReadGuard<'_, parking_lot::RawRwLock, DagState> +{ + fn get_blocks(&self, refs: &[BlockRef]) -> Vec> { + DagState::get_blocks(self, refs) + } + + fn gc_round(&self) -> Round { + DagState::gc_round(self) + } + + fn gc_enabled(&self) -> bool { + DagState::gc_enabled(self) + } +} + /// Expand a committed sequence of leader into a sequence of sub-dags. #[derive(Clone)] pub(crate) struct Linearizer { @@ -46,19 +72,56 @@ impl Linearizer { let last_commit_digest = dag_state.last_commit_digest(); let last_commit_timestamp_ms = dag_state.last_commit_timestamp_ms(); let last_committed_rounds = dag_state.last_committed_rounds(); + let timestamp_ms = leader_block.timestamp_ms().max(last_commit_timestamp_ms); + + // Now linearize the sub-dag starting from the leader block + let (to_commit, rejected_transactions) = + Self::linearize_sub_dag(leader_block.clone(), last_committed_rounds, dag_state); + + // Create the Commit. + let commit = Commit::new( + last_commit_index + 1, + last_commit_digest, + timestamp_ms, + leader_block.reference(), + to_commit + .iter() + .map(|block| block.reference()) + .collect::>(), + ); + let serialized = commit + .serialize() + .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e)); + let commit = TrustedCommit::new_trusted(commit, serialized); + + // Create the corresponding committed sub dag + let sub_dag = CommittedSubDag::new( + leader_block.reference(), + to_commit, + rejected_transactions, + timestamp_ms, + commit.reference(), + reputation_scores_desc, + ); + + (sub_dag, commit) + } + + pub(crate) fn linearize_sub_dag( + leader_block: VerifiedBlock, + last_committed_rounds: Vec, + dag_state: impl BlockStoreAPI, + ) -> (Vec, Vec>) { let gc_enabled = dag_state.gc_enabled(); // The GC round here is calculated based on the last committed round of the leader block. The algorithm will attempt to // commit blocks up to this GC round. Once this commit has been processed and written to DagState, then gc round will update // and on the processing of the next commit we'll have it already updated, so no need to do any gc_round recalculations here. // We just use whatever is currently in DagState. let gc_round: Round = dag_state.gc_round(); - - let mut to_commit = Vec::new(); - let mut committed = HashSet::new(); - - let timestamp_ms = leader_block.timestamp_ms().max(last_commit_timestamp_ms); let leader_block_ref = leader_block.reference(); let mut buffer = vec![leader_block]; + let mut committed = HashSet::new(); + let mut to_commit = Vec::new(); assert!(committed.insert(leader_block_ref)); while let Some(x) = buffer.pop() { @@ -96,12 +159,10 @@ impl Linearizer { } } - drop(dag_state); - // The above code should have not yielded any blocks that are <= gc_round, but just to make sure that we'll never // commit anything that should be garbage collected we attempt to prune here as well. if gc_enabled { - assert!(to_commit.iter().all(|block| block.round() > gc_round), "No blocks <= {gc_round} should be committed. Commit index {}, leader round {}, blocks {to_commit:?}.", last_commit_index, leader_block_ref); + assert!(to_commit.iter().all(|block| block.round() > gc_round), "No blocks <= {gc_round} should be committed. Leader round {}, blocks {to_commit:?}.", leader_block_ref); } // Sort the blocks of the sub-dag blocks @@ -111,33 +172,7 @@ impl Linearizer { // Get rejected transactions. let rejected_transactions = vec![vec![]; to_commit.len()]; - // Create the Commit. - let commit = Commit::new( - last_commit_index + 1, - last_commit_digest, - timestamp_ms, - leader_block_ref, - to_commit - .iter() - .map(|block| block.reference()) - .collect::>(), - ); - let serialized = commit - .serialize() - .unwrap_or_else(|e| panic!("Failed to serialize commit: {}", e)); - let commit = TrustedCommit::new_trusted(commit, serialized); - - // Create the corresponding committed sub dag - let sub_dag = CommittedSubDag::new( - leader_block_ref, - to_commit, - rejected_transactions, - timestamp_ms, - commit.reference(), - reputation_scores_desc, - ); - - (sub_dag, commit) + (to_commit, rejected_transactions) } // This function should be called whenever a new commit is observed. This will diff --git a/consensus/core/src/test_dag_builder.rs b/consensus/core/src/test_dag_builder.rs index 347321947c879..f6816ab1befdf 100644 --- a/consensus/core/src/test_dag_builder.rs +++ b/consensus/core/src/test_dag_builder.rs @@ -16,10 +16,11 @@ use crate::{ genesis_blocks, BlockAPI, BlockDigest, BlockRef, BlockTimestampMs, Round, Slot, TestBlock, VerifiedBlock, }, - commit::{sort_sub_dag_blocks, CommitDigest, TrustedCommit, DEFAULT_WAVE_LENGTH}, + commit::{CommitDigest, TrustedCommit, DEFAULT_WAVE_LENGTH}, context::Context, dag_state::DagState, leader_schedule::{LeaderSchedule, LeaderSwapTable}, + linearizer::{BlockStoreAPI, Linearizer}, CommittedSubDag, }; @@ -83,6 +84,9 @@ pub(crate) struct DagBuilder { // All blocks created by dag builder. Will be used to pretty print or to be // retrieved for testing/persiting to dag state. pub(crate) blocks: BTreeMap, + // All the committed sub dags created by the dag builder. + pub(crate) committed_sub_dags: Vec<(CommittedSubDag, TrustedCommit)>, + pub(crate) last_committed_rounds: Vec, wave_length: Round, number_of_leaders: u32, @@ -100,6 +104,7 @@ impl DagBuilder { .collect(); let last_ancestors = genesis.keys().cloned().collect(); Self { + last_committed_rounds: vec![0; context.committee.size()], context, leader_schedule, wave_length: DEFAULT_WAVE_LENGTH, @@ -108,6 +113,7 @@ impl DagBuilder { genesis, last_ancestors, blocks: BTreeMap::new(), + committed_sub_dags: vec![], } } @@ -123,67 +129,108 @@ impl DagBuilder { .collect::>() } - // TODO: reuse logic from Linearizer. - pub(crate) fn get_sub_dag_and_commit( - &self, - leader_block: VerifiedBlock, - last_committed_rounds: Vec, - commit_index: u32, - ) -> (CommittedSubDag, TrustedCommit) { - let mut to_commit = Vec::new(); - let mut committed = HashSet::new(); - - let timestamp_ms = leader_block.timestamp_ms(); - let leader_block_ref = leader_block.reference(); - let mut buffer = vec![leader_block]; - assert!(committed.insert(leader_block_ref)); - while let Some(x) = buffer.pop() { - to_commit.push(x.clone()); - - let ancestors = self.get_blocks( - &x.ancestors() - .iter() - .copied() - .filter(|ancestor| { - // We skip the block if we already committed it or we reached a - // round that we already committed. - !committed.contains(ancestor) - && last_committed_rounds[ancestor.author] < ancestor.round - }) - .collect::>(), - ); + pub(crate) fn all_blocks(&self) -> Vec { + assert!( + !self.blocks.is_empty(), + "No blocks have been created, please make sure that you have called build method" + ); + self.blocks.values().cloned().collect() + } - for ancestor in ancestors { - buffer.push(ancestor.clone()); - assert!(committed.insert(ancestor.reference())); + pub(crate) fn get_sub_dag_and_commits( + &mut self, + leader_rounds: RangeInclusive, + ) -> Vec<(CommittedSubDag, TrustedCommit)> { + let (last_leader_round, mut last_commit_index, mut last_timestamp_ms) = + if let Some((sub_dag, _)) = self.committed_sub_dags.last() { + ( + sub_dag.leader.round, + sub_dag.commit_ref.index, + sub_dag.timestamp_ms, + ) + } else { + (0, 0, 0) + }; + + // Create any remaining committed sub dags + for leader_block in self + .leader_blocks(last_leader_round + 1..=*leader_rounds.end()) + .into_iter() + .flatten() + { + let leader_block_ref = leader_block.reference(); + last_commit_index += 1; + last_timestamp_ms = leader_block.timestamp_ms().max(last_timestamp_ms); + + struct FooStorage { + gc_round: Round, + context: Arc, + blocks: BTreeMap, } - } + impl BlockStoreAPI for FooStorage { + fn get_blocks(&self, refs: &[BlockRef]) -> Vec> { + refs.iter() + .map(|block_ref| self.blocks.get(block_ref).cloned()) + .collect() + } - sort_sub_dag_blocks(&mut to_commit); + fn gc_round(&self) -> Round { + self.gc_round + } - let rejected_transactions = vec![vec![]; to_commit.len()]; + fn gc_enabled(&self) -> bool { + self.context.protocol_config.gc_depth() > 0 + } + } + let storage = FooStorage { + context: self.context.clone(), + blocks: self.blocks.clone(), + gc_round: leader_block + .round() + .saturating_sub(1) + .saturating_sub(self.context.protocol_config.gc_depth()), + }; - let commit = TrustedCommit::new_for_test( - commit_index, - CommitDigest::MIN, - timestamp_ms, - leader_block_ref, - to_commit - .iter() - .map(|block| block.reference()) - .collect::>(), - ); + let (to_commit, rejected_transactions) = Linearizer::linearize_sub_dag( + leader_block, + self.last_committed_rounds.clone(), + storage, + ); - let sub_dag = CommittedSubDag::new( - leader_block_ref, - to_commit, - rejected_transactions, - timestamp_ms, - commit.reference(), - vec![], - ); + // Update the last committed rounds + for block in &to_commit { + self.last_committed_rounds[block.author()] = + self.last_committed_rounds[block.author()].max(block.round()); + } - (sub_dag, commit) + let commit = TrustedCommit::new_for_test( + last_commit_index, + CommitDigest::MIN, + last_timestamp_ms, + leader_block_ref, + to_commit + .iter() + .map(|block| block.reference()) + .collect::>(), + ); + + let sub_dag = CommittedSubDag::new( + leader_block_ref, + to_commit, + rejected_transactions, + last_timestamp_ms, + commit.reference(), + vec![], + ); + + self.committed_sub_dags.push((sub_dag, commit)); + } + + self.committed_sub_dags + .clone() + .into_iter() + .filter(|(sub_dag, _)| leader_rounds.contains(&sub_dag.leader.round)) + .collect() } pub(crate) fn leader_blocks(