diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 1baa67398a49..3e9a593197ac 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -30,6 +30,7 @@ use log::{debug, trace}; use std::{ fmt, + num::NonZeroUsize, time::{Duration, Instant}, }; @@ -98,6 +99,19 @@ pub struct IncomingBlock { /// Verify a justification of a block #[async_trait::async_trait] pub trait Verifier: Send + Sync { + /// How many blocks can be verified concurrently. + /// + /// Defaults to 1, which means blocks are verified sequentially, one at a time. + /// + /// Value higher than one means verification on blocks can be done in arbitrary order, + /// doesn't expect parent block to be imported first, etc. This significantly improves sync + /// speed by leveraging multiple CPU cores. Good value here is to make concurrency equal to + /// number of CPU cores available. Note that blocks will be verified concurrently, not in + /// parallel, so spawn blocking tasks internally as needed. + fn verification_concurrency(&self) -> NonZeroUsize { + NonZeroUsize::new(1).expect("Not zero; qed") + } + /// Verify the given block data and return the `BlockImportParams` to /// continue the block import process. async fn verify(&self, block: BlockImportParams) -> Result, String>; @@ -227,7 +241,9 @@ pub async fn import_single_block>( block: IncomingBlock, verifier: &V, ) -> BlockImportResult { - match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? { + match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None) + .await? + { SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), SingleBlockVerificationOutcome::Verified(import_parameters) => import_single_block_metered(import_handle, import_parameters, None).await, @@ -296,6 +312,7 @@ pub(crate) async fn verify_single_block_metered>( block_origin: BlockOrigin, block: IncomingBlock, verifier: &V, + allow_missing_parent: bool, metrics: Option<&Metrics>, ) -> Result, BlockImportError> { let peer = block.origin; @@ -328,7 +345,7 @@ pub(crate) async fn verify_single_block_metered>( parent_hash, allow_missing_state: block.allow_missing_state, import_existing: block.import_existing, - allow_missing_parent: block.state.is_some(), + allow_missing_parent: allow_missing_parent || block.state.is_some(), }) .await, )? { @@ -390,7 +407,7 @@ pub(crate) async fn verify_single_block_metered>( } pub(crate) async fn import_single_block_metered( - import_handle: &mut impl BlockImport, + import_handle: &impl BlockImport, import_parameters: SingleBlockImportParameters, metrics: Option<&Metrics>, ) -> BlockImportResult { diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 7b371145e2e7..775d611534ca 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use futures::{ prelude::*, + stream::FuturesOrdered, task::{Context, Poll}, }; use log::{debug, trace}; @@ -27,7 +28,11 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor}, Justification, Justifications, }; -use std::pin::Pin; +use std::{ + num::NonZeroUsize, + pin::Pin, + sync::atomic::{AtomicBool, Ordering}, +}; use crate::{ import_queue::{ @@ -221,7 +226,7 @@ mod worker_messages { /// /// Returns when `block_import` ended. async fn block_import_process( - mut block_import: BoxBlockImport, + block_import: BoxBlockImport, verifier: impl Verifier, mut result_sender: BufferedLinkSender, mut block_import_receiver: TracingUnboundedReceiver>, @@ -240,8 +245,15 @@ async fn block_import_process( }, }; - let res = - import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await; + let res = import_many_blocks_with_verification_concurrency( + &block_import, + origin, + blocks, + &verifier, + metrics.as_ref(), + verifier.verification_concurrency(), + ) + .await; result_sender.blocks_processed(res.imported, res.block_count, res.results); } @@ -383,12 +395,16 @@ struct ImportManyBlocksResult { /// /// This will yield after each imported block once, to ensure that other futures can /// be called as well. -async fn import_many_blocks>( - import_handle: &mut BoxBlockImport, +/// +/// When verification concurrency is set to value higher than 1, block verification will happen in +/// parallel to block import, reducing overall time required. +async fn import_many_blocks_with_verification_concurrency>( + import_handle: &BoxBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, verifier: &V, - metrics: Option, + metrics: Option<&Metrics>, + verification_concurrency: NonZeroUsize, ) -> ImportManyBlocksResult { let count = blocks.len(); @@ -403,46 +419,58 @@ async fn import_many_blocks>( trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range); - let mut imported = 0; - let mut results = vec![]; - let mut has_error = false; - let mut blocks = blocks.into_iter(); + let has_error = &AtomicBool::new(false); + + let verify_block_task = |index, block: IncomingBlock| { + async move { + let block_number = block.header.as_ref().map(|h| *h.number()); + let block_hash = block.hash; + + let result = if has_error.load(Ordering::Acquire) { + Err(BlockImportError::Cancelled) + } else { + verify_single_block_metered( + import_handle, + blocks_origin, + block, + verifier, + // Check parent for the first block, but skip for others since blocks are + // verified concurrently before being imported. + index != 0, + metrics, + ) + .await + }; + + (block_number, block_hash, result) + } + }; // Blocks in the response/drain should be in ascending order. - loop { - // Is there any block left to import? - let block = match blocks.next() { - Some(b) => b, - None => { - // No block left to import, success! - return ImportManyBlocksResult { block_count: count, imported, results } - }, - }; + let mut blocks_to_verify = blocks.into_iter().enumerate(); + let mut verified_blocks = blocks_to_verify + .by_ref() + .take(verification_concurrency.get()) + .map(|(index, block)| verify_block_task(index, block)) + .collect::>(); + + let mut imported = 0; + let mut results = vec![]; - let block_number = block.header.as_ref().map(|h| *h.number()); - let block_hash = block.hash; - let import_result = if has_error { + while let Some((block_number, block_hash, verification_result)) = verified_blocks.next().await { + let import_result = if has_error.load(Ordering::Acquire) { Err(BlockImportError::Cancelled) } else { - let verification_fut = verify_single_block_metered( - import_handle, - blocks_origin, - block, - verifier, - metrics.as_ref(), - ); - match verification_fut.await { + // The actual import. + match verification_result { Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), - Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => { - // The actual import. - import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) - .await - }, + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => + import_single_block_metered(import_handle, import_parameters, metrics).await, Err(e) => Err(e), } }; - if let Some(metrics) = metrics.as_ref() { + if let Some(metrics) = metrics { metrics.report_import::(&import_result); } @@ -455,13 +483,21 @@ async fn import_many_blocks>( ); imported += 1; } else { - has_error = true; + has_error.store(true, Ordering::Release); } results.push((import_result, block_hash)); + // Add more blocks into verification queue if there are any + if let Some((index, block)) = blocks_to_verify.next() { + verified_blocks.push_back(verify_block_task(index, block)); + } + Yield::new().await } + + // No block left to import, success! + ImportManyBlocksResult { block_count: count, imported, results } } /// A future that will always `yield` on the first call of `poll` but schedules the