diff --git a/consensus/config/Cargo.toml b/consensus/config/Cargo.toml index 402d72834a10b..60e03b15f2a0d 100644 --- a/consensus/config/Cargo.toml +++ b/consensus/config/Cargo.toml @@ -12,7 +12,6 @@ mysten-network.workspace = true rand.workspace = true serde.workspace = true shared-crypto.workspace = true - workspace-hack.workspace = true [dev-dependencies] diff --git a/consensus/config/src/parameters.rs b/consensus/config/src/parameters.rs index 225ff708a49e9..f835b0a37c81c 100644 --- a/consensus/config/src/parameters.rs +++ b/consensus/config/src/parameters.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::path::PathBuf; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -16,18 +17,32 @@ pub struct Parameters { /// Time to wait for parent round leader before sealing a block. #[serde(default = "Parameters::default_leader_timeout")] pub leader_timeout: Duration, + + /// The database path. The path should be provided in order for the node to be able to boot + pub db_path: Option, } impl Parameters { pub fn default_leader_timeout() -> Duration { Duration::from_millis(250) } + + pub fn db_path_str_unsafe(&self) -> String { + self.db_path + .clone() + .expect("DB path is not set") + .as_path() + .to_str() + .unwrap() + .to_string() + } } impl Default for Parameters { fn default() -> Self { Self { leader_timeout: Parameters::default_leader_timeout(), + db_path: None, } } } diff --git a/consensus/config/tests/snapshots/parameters_test__parameters.snap b/consensus/config/tests/snapshots/parameters_test__parameters.snap index 742cad5b26206..e1a32bb537bfa 100644 --- a/consensus/config/tests/snapshots/parameters_test__parameters.snap +++ b/consensus/config/tests/snapshots/parameters_test__parameters.snap @@ -5,4 +5,5 @@ expression: parameters leader_timeout: secs: 0 nanos: 250000000 +db_path: ~ diff --git a/consensus/core/src/authority_node.rs b/consensus/core/src/authority_node.rs index c6c672f20139a..f85153a7d7409 100644 --- a/consensus/core/src/authority_node.rs +++ b/consensus/core/src/authority_node.rs @@ -16,6 +16,7 @@ use crate::core::{Core, CoreSignals}; use crate::core_thread::CoreThreadDispatcher; use crate::leader_timeout::{LeaderTimeoutTask, LeaderTimeoutTaskHandle}; use crate::metrics::initialise_metrics; +use crate::storage::rocksdb_store::RocksDBStore; use crate::transactions_client::{TransactionsClient, TransactionsConsumer}; pub struct AuthorityNode { @@ -55,12 +56,14 @@ impl AuthorityNode { // Construct Core let (core_signals, signals_receivers) = CoreSignals::new(); let block_manager = BlockManager::new(); + let store = Arc::new(RocksDBStore::new(&context.parameters.db_path_str_unsafe())); let core = Core::new( context.clone(), tx_consumer, block_manager, core_signals, block_signer, + store, ); let (core_dispatcher, core_dispatcher_handle) = @@ -104,6 +107,7 @@ mod tests { use fastcrypto::traits::ToFromBytes; use prometheus::Registry; use sui_protocol_config::ProtocolConfig; + use tempfile::TempDir; use crate::authority_node::AuthorityNode; use crate::block_verifier::TestBlockVerifier; @@ -112,7 +116,11 @@ mod tests { async fn start_and_stop() { let (committee, keypairs) = local_committee_and_keys(0, vec![1]); let registry = Registry::new(); - let parameters = Parameters::default(); + let temp_dir = TempDir::new().unwrap(); + let parameters = Parameters { + db_path: Some(temp_dir.into_path()), + ..Default::default() + }; let block_verifier = TestBlockVerifier {}; let (own_index, _) = committee.authorities().last().unwrap(); diff --git a/consensus/core/src/block.rs b/consensus/core/src/block.rs index 80e3bd7ec558e..59736ff06c2e3 100644 --- a/consensus/core/src/block.rs +++ b/consensus/core/src/block.rs @@ -71,10 +71,10 @@ pub enum Block { } impl Block { - /// Generate the genesis blocks for the latest Block version. The tuple contains (my_genesis_block, others_genesis_blocks). + /// Generate the genesis blocks for the latest Block version. The tuple contains (my_genesis_block, all_genesis_blocks). /// The blocks are returned in authority index order. pub(crate) fn genesis(context: Arc) -> (VerifiedBlock, Vec) { - let (my_block, others_block): (Vec<_>, Vec<_>) = context + let blocks = context .committee .authorities() .map(|(authority_index, _)| { @@ -85,8 +85,15 @@ impl Block { VerifiedBlock::new_verified_unserialized(signed) .expect("Shouldn't fail when creating verified block for genesis") }) - .partition(|block| block.author() == context.own_index); - (my_block[0].clone(), others_block) + .collect::>(); + ( + blocks + .iter() + .find(|b| b.author() == context.own_index) + .cloned() + .expect("We should have found our own genesis block"), + blocks, + ) } } diff --git a/consensus/core/src/context.rs b/consensus/core/src/context.rs index bd49c3f784fcb..eaa521a2dcfe3 100644 --- a/consensus/core/src/context.rs +++ b/consensus/core/src/context.rs @@ -6,6 +6,9 @@ use std::sync::Arc; use consensus_config::{AuthorityIndex, Committee, Parameters}; use sui_protocol_config::ProtocolConfig; +#[cfg(test)] +use tempfile::TempDir; + use crate::metrics::Metrics; #[cfg(test)] @@ -56,11 +59,15 @@ impl Context { let (committee, keypairs) = consensus_config::local_committee_and_keys(0, vec![1; committee_size]); let metrics = test_metrics(); + let temp_dir = TempDir::new().unwrap(); let context = Context::new( AuthorityIndex::new_for_test(0), committee, - Parameters::default(), + Parameters { + db_path: Some(temp_dir.into_path()), + ..Default::default() + }, ProtocolConfig::get_for_max_version_UNSAFE(), metrics, ); diff --git a/consensus/core/src/core.rs b/consensus/core/src/core.rs index 26586097973a6..cad690bce4a5b 100644 --- a/consensus/core/src/core.rs +++ b/consensus/core/src/core.rs @@ -15,6 +15,7 @@ use crate::{ transactions_client::TransactionsConsumer, }; +use crate::storage::Store; use consensus_config::{AuthorityIndex, NetworkKeyPair}; use mysten_metrics::monitored_scope; use tokio::sync::watch; @@ -37,6 +38,8 @@ pub(crate) struct Core { signals: CoreSignals, /// The keypair to be used for block signing block_signer: NetworkKeyPair, + /// The node's storage + store: Arc, } #[allow(dead_code)] @@ -45,40 +48,51 @@ impl Core { context: Arc, transactions_consumer: TransactionsConsumer, block_manager: BlockManager, - mut signals: CoreSignals, + signals: CoreSignals, block_signer: NetworkKeyPair, + store: Arc, ) -> Self { - // TODO: restore the threshold clock round based on the last quorum data in storage when crash/recover - let mut threshold_clock = ThresholdClock::new(0, context.clone()); - - // TODO: restore based on DagState, for now we just init via the genesis - let (genesis_my, mut genesis_others) = Block::genesis(context.clone()); - genesis_others.push(genesis_my.clone()); - - // populate the threshold clock to properly advance the round & also the pending ancestors - let mut pending_ancestors: BTreeMap> = BTreeMap::new(); - for ancestor in genesis_others { - threshold_clock.add_block(ancestor.reference()); - pending_ancestors - .entry(ancestor.round()) - .or_default() - .push(ancestor) - } - - // emit a signal for the last threshold clock round, even if that's unnecessary it will ensure that the timeout - // logic will trigger to attempt a block creation. - signals.new_round(threshold_clock.get_round()); + let (my_genesis_block, all_genesis_blocks) = Block::genesis(context.clone()); Self { - context, - threshold_clock, - last_proposed_block: genesis_my, + context: context.clone(), + threshold_clock: ThresholdClock::new(0, context), + last_proposed_block: my_genesis_block, transactions_consumer, - pending_ancestors, + pending_ancestors: BTreeMap::new(), block_manager, signals, block_signer, + store, + } + .recover(all_genesis_blocks) + } + + fn recover(mut self, genesis_blocks: Vec) -> Self { + // We always need the genesis blocks as a starter point since we might not have advanced yet at all. + let mut all_blocks = genesis_blocks; + + // Now fetch the proposed blocks per authority for their last two rounds. + let context = self.context.clone(); + for (index, _authority) in context.committee.authorities() { + let blocks = self + .store + .scan_last_blocks_by_author(index, 2) + .expect("Storage error while recovering Core"); + all_blocks.extend(blocks); } + + // Recover the last proposed block + self.last_proposed_block = all_blocks + .iter() + .filter(|block| block.author() == context.own_index) + .max_by_key(|block| block.round()) + .cloned() + .expect("At least one block - even genesis - should be present"); + + // Accept all blocks but make sure that only the last quorum round blocks and onwards are kept. + self.add_accepted_blocks(all_blocks, Some(0)); + self } /// Processes the provided blocks and accepts them if possible when their causal history exists. @@ -89,6 +103,26 @@ impl Core { // Try to accept them via the block manager let accepted_blocks = self.block_manager.add_blocks(blocks); + // Now process them, basically move the threshold clock and add them to pending list + self.add_accepted_blocks(accepted_blocks, None); + + // Attempt to create a new block + let _ = self.try_new_block(false); + + // TODO: we don't deal for now with missed references, will address later. + vec![] + } + + /// Adds/processed all the newly `accepted_blocks`. We basically try to move the threshold clock and add them to the + /// pending ancestors list. The `pending_ancestors_retain_rounds` if defined then the method will retain on the pending ancestors + /// only the `pending_ancestors_retain_rounds` from the last formed quorum round. For example if set to zero (0), then + /// we'll strictly keep in the pending ancestors list the blocks of round >= last_quorum_round. If not defined, so None + /// is provided, then all the pending ancestors will be kep until the next block proposal. + fn add_accepted_blocks( + &mut self, + accepted_blocks: Vec, + pending_ancestors_retain_rounds: Option, + ) { // Advance the threshold clock. If advanced to a new round then send a signal that a new quorum has been received. if let Some(new_round) = self .threshold_clock @@ -105,11 +139,6 @@ impl Core { .threshold_clock_round .set(self.threshold_clock.get_round() as i64); - // TODO: we might need to consider the following: - // 1. Add some sort of protection from bulk catch ups - or intentional validator attack - that is flooding us with - // many blocks, so we don't spam the pending_ancestors list and OOM - // 2. Probably it doesn't make much sense to keep blocks around from too many rounds ago to reference as the data - // might not be relevant any more. for accepted_block in accepted_blocks { self.pending_ancestors .entry(accepted_block.round()) @@ -117,11 +146,17 @@ impl Core { .push(accepted_block); } - // Attempt to create a new block - let _ = self.try_new_block(false); - - // TODO: we don't deal for now with missed references, will address later. - vec![] + // TODO: we might need to consider the following: + // 1. Add some sort of protection from bulk catch ups - or intentional validator attack - that is flooding us with + // many blocks, so we don't spam the pending_ancestors list and OOM + // 2. Probably it doesn't make much sense to keep blocks around from too many rounds ago to reference as the data + // might not be relevant any more. + if let Some(retain_ancestor_rounds_from_quorum) = pending_ancestors_retain_rounds { + let last_quorum = self.threshold_clock.get_round().saturating_sub(1); + self.pending_ancestors.retain(|round, _| { + *round >= last_quorum.saturating_sub(retain_ancestor_rounds_from_quorum) + }); + } } /// Force creating a new block for the dictated round. This is used when a leader timeout occurs. @@ -355,8 +390,137 @@ mod test { use super::*; use crate::block::TestBlock; + use crate::storage::mem_store::MemStore; use crate::transactions_client::TransactionsClient; + /// Recover Core and continue proposing from the last round which forms a quorum. + #[tokio::test] + async fn test_core_recover_from_store_for_full_round() { + let (context, mut key_pairs) = Context::new_for_test(4); + let context = Arc::new(context); + let block_manager = BlockManager::new(); + let (_transactions_client, tx_receiver) = TransactionsClient::new(context.clone()); + let transactions_consumer = TransactionsConsumer::new(tx_receiver, context.clone(), None); + let store = Arc::new(MemStore::new()); + + // Create test blocks for all the authorities for 4 rounds and populate them in store + let (_, mut last_round_blocks) = Block::genesis(context.clone()); + let mut all_blocks = last_round_blocks.clone(); + for round in 1..=4 { + let mut this_round_blocks = Vec::new(); + for (index, _authority) in context.committee.authorities() { + let block = TestBlock::new(round, index.value() as u32) + .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect()) + .build(); + + this_round_blocks.push(VerifiedBlock::new_for_test(block)); + } + all_blocks.extend(this_round_blocks.clone()); + last_round_blocks = this_round_blocks; + } + + // write them in store + store.write(all_blocks, vec![]).expect("Storage error"); + + // Now spin up core + let (signals, signal_receivers) = CoreSignals::new(); + let mut core = Core::new( + context.clone(), + transactions_consumer, + block_manager, + signals, + key_pairs.remove(context.own_index.value()).0, + store, + ); + + // New round should be 5 + let mut new_round = signal_receivers.new_round_receiver(); + assert_eq!(*new_round.borrow_and_update(), 5); + + // When trying to propose now we should propose block for round 5 + let proposed_block = core + .try_new_block(true) + .expect("A block should have been created"); + assert_eq!(proposed_block.round(), 5); + let ancestors = proposed_block.ancestors(); + + // Only ancestors of round 4 should be included. + assert_eq!(ancestors.len(), 4); + for ancestor in ancestors { + assert_eq!(ancestor.round, 4); + } + } + + /// Recover Core and continue proposing when having a partial last round which doesn't form a quorum and we haven't + /// proposed for that round yet. + #[tokio::test] + async fn test_core_recover_from_store_for_partial_round() { + let (context, mut key_pairs) = Context::new_for_test(4); + let context = Arc::new(context); + let block_manager = BlockManager::new(); + let (_transactions_client, tx_receiver) = TransactionsClient::new(context.clone()); + let transactions_consumer = TransactionsConsumer::new(tx_receiver, context.clone(), None); + let store = Arc::new(MemStore::new()); + + // Create test blocks for all authorities except our's (index = 0) . + let (_, mut last_round_blocks) = Block::genesis(context.clone()); + let mut all_blocks = last_round_blocks.clone(); + for round in 1..=4 { + let mut this_round_blocks = Vec::new(); + + // For round 4 only produce f+1 blocks only skip our validator and that of position 1 from creating blocks. + let authorities_to_skip = if round == 4 { + context.committee.validity_threshold() as usize + } else { + // otherwise always skip creating a block for our authority + 1 + }; + + for (index, _authority) in context.committee.authorities().skip(authorities_to_skip) { + let block = TestBlock::new(round, index.value() as u32) + .set_ancestors(last_round_blocks.iter().map(|b| b.reference()).collect()) + .build(); + this_round_blocks.push(VerifiedBlock::new_for_test(block)); + } + all_blocks.extend(this_round_blocks.clone()); + last_round_blocks = this_round_blocks; + } + + // write them in store + store.write(all_blocks, vec![]).expect("Storage error"); + + // Now spin up core + let (signals, signal_receivers) = CoreSignals::new(); + let mut core = Core::new( + context.clone(), + transactions_consumer, + block_manager, + signals, + key_pairs.remove(context.own_index.value()).0, + store, + ); + + // New round should be 4 + let mut new_round = signal_receivers.new_round_receiver(); + assert_eq!(*new_round.borrow_and_update(), 4); + + // When trying to propose now we should propose block for round 4 + let proposed_block = core + .try_new_block(true) + .expect("A block should have been created"); + assert_eq!(proposed_block.round(), 4); + let ancestors = proposed_block.ancestors(); + + assert_eq!(ancestors.len(), 4); + for ancestor in ancestors { + if ancestor.author == context.own_index { + assert_eq!(ancestor.round, 0); + } else { + assert_eq!(ancestor.round, 3); + } + } + } + #[tokio::test] async fn test_core_propose_after_genesis() { let _guard = ProtocolConfig::apply_overrides_for_testing(|_, mut config| { @@ -371,6 +535,7 @@ mod test { let (transactions_client, tx_receiver) = TransactionsClient::new(context.clone()); let transactions_consumer = TransactionsConsumer::new(tx_receiver, context.clone(), None); let (signals, _signal_receivers) = CoreSignals::new(); + let store = Arc::new(MemStore::new()); let mut core = Core::new( context.clone(), @@ -378,6 +543,7 @@ mod test { block_manager, signals, key_pairs.remove(context.own_index.value()).0, + store, ); // Send some transactions @@ -420,11 +586,10 @@ mod test { ); // genesis blocks should be referenced - let (genesis_my, mut genesis_others) = Block::genesis(context); - genesis_others.push(genesis_my); + let (_genesis_my, all_genesis) = Block::genesis(context); for ancestor in block.ancestors() { - genesis_others + all_genesis .iter() .find(|block| block.reference() == *ancestor) .expect("Block should be found amongst genesis blocks"); @@ -443,6 +608,7 @@ mod test { let (_transactions_client, tx_receiver) = TransactionsClient::new(context.clone()); let transactions_consumer = TransactionsConsumer::new(tx_receiver, context.clone(), None); let (signals, _signal_receivers) = CoreSignals::new(); + let store = Arc::new(MemStore::new()); let mut core = Core::new( context.clone(), @@ -450,6 +616,7 @@ mod test { block_manager, signals, key_pairs.remove(context.own_index.value()).0, + store, ); let mut expected_ancestors = BTreeSet::new(); @@ -651,6 +818,7 @@ mod test { let transactions_consumer = TransactionsConsumer::new(tx_receiver, context.clone(), None); let (signals, signal_receivers) = CoreSignals::new(); + let store = Arc::new(MemStore::new()); let block_signer = signers.remove(index).0; @@ -660,6 +828,7 @@ mod test { block_manager, signals, block_signer, + store, ); cores.push((core, signal_receivers)); diff --git a/consensus/core/src/core_thread.rs b/consensus/core/src/core_thread.rs index f06a47dd192f5..ffcf6a6ac5925 100644 --- a/consensus/core/src/core_thread.rs +++ b/consensus/core/src/core_thread.rs @@ -169,6 +169,7 @@ mod test { use crate::block_manager::BlockManager; use crate::context::Context; use crate::core::CoreSignals; + use crate::storage::mem_store::MemStore; use crate::transactions_client::{TransactionsClient, TransactionsConsumer}; #[tokio::test] @@ -179,12 +180,15 @@ mod test { let (_transactions_client, tx_receiver) = TransactionsClient::new(context.clone()); let transactions_consumer = TransactionsConsumer::new(tx_receiver, context.clone(), None); let (signals, _signal_receivers) = CoreSignals::new(); + let store = Arc::new(MemStore::new()); + let core = Core::new( context.clone(), transactions_consumer, block_manager, signals, key_pairs.remove(context.own_index.value()).0, + store, ); let (core_dispatcher, handle) = CoreThreadDispatcher::start(core, context); diff --git a/consensus/core/src/leader_timeout.rs b/consensus/core/src/leader_timeout.rs index a07ff2ce25b0d..18f4a5e5ba6d6 100644 --- a/consensus/core/src/leader_timeout.rs +++ b/consensus/core/src/leader_timeout.rs @@ -147,7 +147,10 @@ mod tests { let (context, _signers) = Context::new_for_test(4); let dispatcher = MockCoreThreadDispatcher::default(); let leader_timeout = Duration::from_millis(500); - let parameters = Parameters { leader_timeout }; + let parameters = Parameters { + leader_timeout, + ..Default::default() + }; let context = Arc::new(context.with_parameters(parameters)); let now = Instant::now(); @@ -181,7 +184,10 @@ mod tests { let (context, _signers) = Context::new_for_test(4); let dispatcher = MockCoreThreadDispatcher::default(); let leader_timeout = Duration::from_millis(500); - let parameters = Parameters { leader_timeout }; + let parameters = Parameters { + leader_timeout, + ..Default::default() + }; let context = Arc::new(context.with_parameters(parameters)); let now = Instant::now(); diff --git a/consensus/core/src/storage/mem_store.rs b/consensus/core/src/storage/mem_store.rs index ac0af7d7b76ff..62810a0800d5b 100644 --- a/consensus/core/src/storage/mem_store.rs +++ b/consensus/core/src/storage/mem_store.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::VecDeque; use std::{ collections::{BTreeMap, BTreeSet}, ops::Bound::{Included, Unbounded}, @@ -105,6 +106,35 @@ impl Store for MemStore { Ok(blocks) } + fn scan_last_blocks_by_author( + &self, + author: AuthorityIndex, + num_of_rounds: u64, + ) -> ConsensusResult> { + let mut refs = VecDeque::new(); + for &(author, round, digest) in self + .inner + .read() + .digests_by_authorities + .range(( + Included((author, Round::MIN, BlockDigest::MIN)), + Included((author, Round::MAX, BlockDigest::MAX)), + )) + .rev() + .take(num_of_rounds as usize) + { + refs.push_front(BlockRef::new(round, author, digest)); + } + let results = self.read_blocks(refs.as_slices().0)?; + let mut blocks = vec![]; + for (r, block) in refs.into_iter().zip(results.into_iter()) { + blocks.push( + block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)), + ); + } + Ok(blocks) + } + fn read_last_commit(&self) -> ConsensusResult> { let inner = self.inner.read(); Ok(inner diff --git a/consensus/core/src/storage/mod.rs b/consensus/core/src/storage/mod.rs index a774052b18859..58091d754c915 100644 --- a/consensus/core/src/storage/mod.rs +++ b/consensus/core/src/storage/mod.rs @@ -33,6 +33,14 @@ pub(crate) trait Store: Send + Sync { start_round: Round, ) -> ConsensusResult>; + /// Reads an author's blocks from the last produced round up to `num_of_rounds` before (assuming such many rounds exist) in + /// round ascending order. + fn scan_last_blocks_by_author( + &self, + author: AuthorityIndex, + num_of_rounds: u64, + ) -> ConsensusResult>; + /// Reads the last commit. fn read_last_commit(&self) -> ConsensusResult>; diff --git a/consensus/core/src/storage/rocksdb_store.rs b/consensus/core/src/storage/rocksdb_store.rs index f8728c0e1f0e6..3f2e9cfcb90cc 100644 --- a/consensus/core/src/storage/rocksdb_store.rs +++ b/consensus/core/src/storage/rocksdb_store.rs @@ -1,6 +1,7 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use std::collections::VecDeque; use std::{ ops::Bound::{Included, Unbounded}, time::Duration, @@ -156,6 +157,36 @@ impl Store for RocksDBStore { Ok(blocks) } + // The method returns the last `num_of_rounds` rounds blocks by author in round ascending order. + fn scan_last_blocks_by_author( + &self, + author: AuthorityIndex, + num_of_rounds: u64, + ) -> ConsensusResult> { + let mut refs = VecDeque::new(); + for kv in self + .digests_by_authorities + .safe_range_iter(( + Included((author, Round::MIN, BlockDigest::MIN)), + Included((author, Round::MAX, BlockDigest::MAX)), + )) + .skip_to_last() + .reverse() + .take(num_of_rounds as usize) + { + let ((author, round, digest), _) = kv?; + refs.push_front(BlockRef::new(round, author, digest)); + } + let results = self.read_blocks(refs.as_slices().0)?; + let mut blocks = vec![]; + for (r, block) in refs.into_iter().zip(results.into_iter()) { + blocks.push( + block.unwrap_or_else(|| panic!("Storage inconsistency: block {:?} not found!", r)), + ); + } + Ok(blocks) + } + fn read_last_commit(&self) -> ConsensusResult> { let Some(commit) = self.commits.safe_iter().skip_to_last().next() else { return Ok(None); diff --git a/consensus/core/src/storage/store_tests.rs b/consensus/core/src/storage/store_tests.rs index f4d5dd4346c47..0b6b8b6d24556 100644 --- a/consensus/core/src/storage/store_tests.rs +++ b/consensus/core/src/storage/store_tests.rs @@ -162,6 +162,22 @@ async fn scan_blocks( ] ); } + + { + let scanned_blocks = store + .scan_last_blocks_by_author(AuthorityIndex::new_for_test(1), 2) + .expect("Scan blocks should not fail"); + assert_eq!(scanned_blocks.len(), 2, "{:?}", scanned_blocks); + assert_eq!( + scanned_blocks, + vec![written_blocks[7].clone(), additional_blocks[2].clone()] + ); + + let scanned_blocks = store + .scan_last_blocks_by_author(AuthorityIndex::new_for_test(1), 0) + .expect("Scan blocks should not fail"); + assert_eq!(scanned_blocks.len(), 0); + } } #[rstest] diff --git a/consensus/core/src/test_dag.rs b/consensus/core/src/test_dag.rs index 13d74d2018240..ab0edfc77d3f1 100644 --- a/consensus/core/src/test_dag.rs +++ b/consensus/core/src/test_dag.rs @@ -32,8 +32,7 @@ pub(crate) fn build_dag( start } None => { - let (my_genesis_block, mut genesis) = Block::genesis(context.clone()); - genesis.insert(0, my_genesis_block); + let (_my_genesis_block, genesis) = Block::genesis(context.clone()); let references = genesis.iter().map(|x| x.reference()).collect::>(); dag_state.write().accept_blocks(genesis);