From dd57c365b39d36569306213769a933581e654bfb Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 14 Oct 2023 23:33:27 +0300 Subject: [PATCH] Unify concurrent and non-concurrent verification with ability to control concurrency --- substrate/client/consensus/common/Cargo.toml | 2 +- .../consensus/common/src/import_queue.rs | 21 +-- .../common/src/import_queue/basic_queue.rs | 145 +++++------------- 3 files changed, 47 insertions(+), 121 deletions(-) diff --git a/substrate/client/consensus/common/Cargo.toml b/substrate/client/consensus/common/Cargo.toml index c43da3299d63..4bffc3ca780f 100644 --- a/substrate/client/consensus/common/Cargo.toml +++ b/substrate/client/consensus/common/Cargo.toml @@ -32,7 +32,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" } sp-core = { path = "../../../primitives/core" } sp-runtime = { path = "../../../primitives/runtime" } sp-state-machine = { path = "../../../primitives/state-machine" } -tokio = "1.32.0" +tokio = { version = "1.32.0", features = ["macros"] } [dev-dependencies] sp-test-primitives = { path = "../../../primitives/test-primitives" } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 079bb1cd0c0c..4a1a3479c5da 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -32,6 +32,7 @@ use log::{debug, trace}; use std::{ fmt, future::Future, + num::NonZeroUsize, ops::Deref, pin::Pin, sync::Arc, @@ -129,16 +130,16 @@ pub struct IncomingBlock { /// Verify a justification of a block #[async_trait::async_trait] pub trait Verifier: Send + Sync { - /// Whether verifier supports stateless verification. + /// How many blocks can be verified concurrently. /// - /// Stateless verification means that verification on blocks can be done in arbitrary order, - /// doesn't expect parent block to be imported first, etc. + /// Defaults to 1, which means blocks are verified sequentially, one at a time. /// - /// Verifiers that support stateless verification can verify multiple blocks concurrently, - /// significantly improving sync speed. - fn supports_stateless_verification(&self) -> bool { - // Unless re-defined by verifier is assumed to not support stateless verification. - false + /// value higher than one means verification on blocks can be done in arbitrary order, + /// doesn't expect parent block to be imported first, etc. This significantly improves sync + /// speed by leveraging multiple CPU cores. Good value here is to make concurrency equal to + /// number of CPU cores available. + fn verification_concurrency(&self) -> NonZeroUsize { + NonZeroUsize::new(1).expect("Not zero; qed") } /// Verify the given block data and return the `BlockImportParams` to @@ -150,8 +151,8 @@ impl Verifier for Arc> where Block: BlockT, { - fn supports_stateless_verification(&self) -> bool { - (**self).supports_stateless_verification() + fn verification_concurrency(&self) -> NonZeroUsize { + (**self).verification_concurrency() } fn verify<'life0, 'async_trait>( diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index a60ce7bc6bfc..86ed8cf928d8 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -29,6 +29,7 @@ use sp_runtime::{ Justification, Justifications, }; use std::{ + num::NonZeroUsize, pin::Pin, sync::{ atomic::{AtomicBool, Ordering}, @@ -250,18 +251,15 @@ async fn block_import_process( }, }; - let res = if verifier.supports_stateless_verification() { - import_many_blocks_with_stateless_verification( - &mut block_import, - origin, - blocks, - &verifier, - metrics.clone(), - ) - .await - } else { - import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await - }; + let res = import_many_blocks_with_verification_concurrency( + &mut block_import, + origin, + blocks, + &verifier, + metrics.clone(), + verifier.verification_concurrency(), + ) + .await; result_sender.blocks_processed(res.imported, res.block_count, res.results); } @@ -404,98 +402,15 @@ struct ImportManyBlocksResult { /// This will yield after each imported block once, to ensure that other futures can /// be called as well. /// -/// For verifiers that support stateless verification use -/// [`import_many_blocks_with_stateless_verification`] for better performance. -async fn import_many_blocks>( - import_handle: &mut SharedBlockImport, - blocks_origin: BlockOrigin, - blocks: Vec>, - verifier: &V, - metrics: Option, -) -> ImportManyBlocksResult { - let count = blocks.len(); - - let blocks_range = match ( - blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), - blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), - ) { - (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), - (Some(first), Some(_)) => format!(" ({})", first), - _ => Default::default(), - }; - - trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range); - - let mut imported = 0; - let mut results = vec![]; - let mut has_error = false; - let mut blocks = blocks.into_iter(); - - // Blocks in the response/drain should be in ascending order. - loop { - // Is there any block left to import? - let block = match blocks.next() { - Some(b) => b, - None => { - // No block left to import, success! - return ImportManyBlocksResult { block_count: count, imported, results } - }, - }; - - let block_number = block.header.as_ref().map(|h| *h.number()); - let block_hash = block.hash; - let import_result = if has_error { - Err(BlockImportError::Cancelled) - } else { - let verification_fut = verify_single_block_metered( - import_handle, - blocks_origin, - block, - verifier, - false, - metrics.as_ref(), - ); - match verification_fut.await { - Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), - Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => { - // The actual import. - import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) - .await - }, - Err(e) => Err(e), - } - }; - - if let Some(metrics) = metrics.as_ref() { - metrics.report_import::(&import_result); - } - - if import_result.is_ok() { - trace!( - target: LOG_TARGET, - "Block imported successfully {:?} ({})", - block_number, - block_hash, - ); - imported += 1; - } else { - has_error = true; - } - - results.push((import_result, block_hash)); - - Yield::new().await - } -} - -/// The same as [`import_many_blocks()`]`, but for verifiers that support stateless verification of -/// blocks (use [`Verifier::supports_stateless_verification()`]). -async fn import_many_blocks_with_stateless_verification( +/// When verification concurrency is set to value higher than 1, block verification will happen in +/// parallel to block import, reducing overall time required. +async fn import_many_blocks_with_verification_concurrency( import_handle: &mut SharedBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, verifier: &Arc>, metrics: Option, + verification_concurrency: NonZeroUsize, ) -> ImportManyBlocksResult { let count = blocks.len(); @@ -512,11 +427,8 @@ async fn import_many_blocks_with_stateless_verification( let has_error = Arc::new(AtomicBool::new(false)); - // Blocks in the response/drain should be in ascending order. - let mut verified_blocks = blocks - .into_iter() - .enumerate() - .map(|(index, block)| { + let verify_block_task = + |index, block: IncomingBlock, import_handle: &SharedBlockImport| { let import_handle = import_handle.clone(); let verifier = Arc::clone(verifier); let metrics = metrics.clone(); @@ -551,7 +463,14 @@ async fn import_many_blocks_with_stateless_verification( (block_number, block_hash, result) } - }) + }; + + // Blocks in the response/drain should be in ascending order. + let mut blocks_to_verify = blocks.into_iter().enumerate(); + let mut verified_blocks = blocks_to_verify + .by_ref() + .take(verification_concurrency.get()) + .map(|(index, block)| verify_block_task(index, block, import_handle)) .collect::>(); let mut imported = 0; @@ -589,6 +508,11 @@ async fn import_many_blocks_with_stateless_verification( results.push((import_result, block_hash)); + // Add more blocks into verification queue if there are any + if let Some((index, block)) = blocks_to_verify.next() { + verified_blocks.push_back(verify_block_task(index, block, import_handle)); + } + Yield::new().await } @@ -632,7 +556,7 @@ mod tests { }, import_queue::Verifier, }; - use futures::{executor::block_on, Future}; + use futures::Future; use sp_test_primitives::{Block, BlockNumber, Hash, Header}; #[async_trait::async_trait] @@ -716,8 +640,8 @@ mod tests { } } - #[test] - fn prioritizes_finality_work_over_block_import() { + #[tokio::test] + async fn prioritizes_finality_work_over_block_import() { let (result_sender, mut result_port) = buffered_link::buffered_link(100_000); let (worker, finality_sender, block_import_sender) = BlockImportWorker::new( @@ -789,7 +713,7 @@ mod tests { let justification3 = import_justification(); // we poll the worker until we have processed 9 events - block_on(futures::future::poll_fn(|cx| { + futures::future::poll_fn(|cx| { while link.events.len() < 9 { match Future::poll(Pin::new(&mut worker), cx) { Poll::Pending => {}, @@ -800,7 +724,8 @@ mod tests { } Poll::Ready(()) - })); + }) + .await; // all justification tasks must be done before any block import work assert_eq!(