From edd6fd6a68834030ab6d6ebdfa33286a51754b12 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 16:33:23 +0300 Subject: [PATCH 1/9] Remove unnecessary delay support from block import --- .../common/src/import_queue/basic_queue.rs | 26 ++++--------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 1cc7ec26fd19..bfa12d09546c 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -19,7 +19,6 @@ use futures::{ prelude::*, task::{Context, Poll}, }; -use futures_timer::Delay; use log::{debug, trace}; use prometheus_endpoint::Registry; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; @@ -28,7 +27,7 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor}, Justification, Justifications, }; -use std::{pin::Pin, time::Duration}; +use std::pin::Pin; use crate::{ import_queue::{ @@ -224,7 +223,6 @@ async fn block_import_process( mut result_sender: BufferedLinkSender, mut block_import_receiver: TracingUnboundedReceiver>, metrics: Option, - delay_between_blocks: Duration, ) { loop { let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await @@ -239,15 +237,9 @@ async fn block_import_process( }, }; - let res = import_many_blocks( - &mut block_import, - origin, - blocks, - &mut verifier, - delay_between_blocks, - metrics.clone(), - ) - .await; + let res = + import_many_blocks(&mut block_import, origin, blocks, &mut verifier, metrics.clone()) + .await; result_sender.blocks_processed(res.imported, res.block_count, res.results); } @@ -281,8 +273,6 @@ impl BlockImportWorker { let mut worker = BlockImportWorker { result_sender, justification_import, metrics }; - let delay_between_blocks = Duration::default(); - let future = async move { // Let's initialize `justification_import` if let Some(justification_import) = worker.justification_import.as_mut() { @@ -297,7 +287,6 @@ impl BlockImportWorker { worker.result_sender.clone(), block_import_port, worker.metrics.clone(), - delay_between_blocks, ); futures::pin_mut!(block_import_process); @@ -394,7 +383,6 @@ async fn import_many_blocks>( blocks_origin: BlockOrigin, blocks: Vec>, verifier: &mut V, - delay_between_blocks: Duration, metrics: Option, ) -> ImportManyBlocksResult { let count = blocks.len(); @@ -460,11 +448,7 @@ async fn import_many_blocks>( results.push((import_result, block_hash)); - if delay_between_blocks != Duration::default() && !has_error { - Delay::new(delay_between_blocks).await; - } else { - Yield::new().await - } + Yield::new().await } } From 10e78dca9453478b9029a83a07313def764e0d9d Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 16:55:38 +0300 Subject: [PATCH 2/9] Fix typos --- cumulus/client/pov-recovery/src/lib.rs | 2 +- .../client/consensus/common/src/import_queue/basic_queue.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cumulus/client/pov-recovery/src/lib.rs b/cumulus/client/pov-recovery/src/lib.rs index b050bc66799c..130828a148dd 100644 --- a/cumulus/client/pov-recovery/src/lib.rs +++ b/cumulus/client/pov-recovery/src/lib.rs @@ -453,7 +453,7 @@ where /// Import the given `block`. /// - /// This will also recursivley drain `waiting_for_parent` and import them as well. + /// This will also recursively drain `waiting_for_parent` and import them as well. async fn import_block(&mut self, block: Block) { let mut blocks = VecDeque::new(); diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index bfa12d09546c..7cc924529dbd 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -197,7 +197,7 @@ impl ImportQueue for BasicQueue { } } -/// Messages destinated to the background worker. +/// Messages designated to the background worker. mod worker_messages { use super::*; @@ -268,7 +268,7 @@ impl BlockImportWorker { let (justification_sender, mut justification_port) = tracing_unbounded("mpsc_import_queue_worker_justification", 100_000); - let (block_import_sender, block_import_port) = + let (block_import_sender, block_import_receiver) = tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000); let mut worker = BlockImportWorker { result_sender, justification_import, metrics }; @@ -285,7 +285,7 @@ impl BlockImportWorker { block_import, verifier, worker.result_sender.clone(), - block_import_port, + block_import_receiver, worker.metrics.clone(), ); futures::pin_mut!(block_import_process); From 2a84bd44f93d2b00205d88b29dce3d6e07d8a201 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 16:57:24 +0300 Subject: [PATCH 3/9] Extend `ImportQueueService::import_blocks` comment to clarify important assumptions about blocks list --- substrate/client/consensus/common/src/import_queue.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 39d5bf8ed35d..5121359afa17 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -104,7 +104,8 @@ pub trait Verifier: Send { /// /// The `import_*` methods can be called in order to send elements for the import queue to verify. pub trait ImportQueueService: Send { - /// Import bunch of blocks. + /// Import bunch of blocks, every next block must be an ancestor of the previous block in the + /// list. fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>); /// Import block justifications. From e616567565d29eef07e15a1d8b473bbbb22ee5d4 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 17:20:46 +0300 Subject: [PATCH 4/9] Refactor `import_handler` into standalone function for future reuse --- .../consensus/common/src/import_queue.rs | 92 +++++++++++-------- 1 file changed, 54 insertions(+), 38 deletions(-) diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 5121359afa17..7d2c0e6182a2 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -28,6 +28,7 @@ //! queues to be instantiated simply. use log::{debug, trace}; +use std::fmt; use sp_consensus::{error::Error as ConsensusError, BlockOrigin}; use sp_runtime::{ @@ -166,16 +167,16 @@ pub trait Link: Send { /// Block import successful result. #[derive(Debug, PartialEq)] -pub enum BlockImportStatus { +pub enum BlockImportStatus { /// Imported known block. - ImportedKnown(N, Option), + ImportedKnown(BlockNumber, Option), /// Imported unknown block. - ImportedUnknown(N, ImportedAux, Option), + ImportedUnknown(BlockNumber, ImportedAux, Option), } -impl BlockImportStatus { +impl BlockImportStatus { /// Returns the imported block number. - pub fn number(&self) -> &N { + pub fn number(&self) -> &BlockNumber { match self { BlockImportStatus::ImportedKnown(n, _) | BlockImportStatus::ImportedUnknown(n, _, _) => n, @@ -227,6 +228,48 @@ pub async fn import_single_block>( import_single_block_metered(import_handle, block_origin, block, verifier, None).await } +fn import_handler( + number: NumberFor, + hash: Block::Hash, + parent_hash: Block::Hash, + block_origin: Option, + import: Result, +) -> Result>, BlockImportError> +where + Block: BlockT, +{ + match import { + Ok(ImportResult::AlreadyInChain) => { + trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash); + Ok(BlockImportStatus::ImportedKnown(number, block_origin)) + }, + Ok(ImportResult::Imported(aux)) => + Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin)), + Ok(ImportResult::MissingState) => { + debug!( + target: LOG_TARGET, + "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash + ); + Err(BlockImportError::MissingState) + }, + Ok(ImportResult::UnknownParent) => { + debug!( + target: LOG_TARGET, + "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash + ); + Err(BlockImportError::UnknownParent) + }, + Ok(ImportResult::KnownBad) => { + debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash); + Err(BlockImportError::BadBlock(block_origin)) + }, + Err(e) => { + debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e); + Err(BlockImportError::Other(e)) + }, + } +} + /// Single block import function with metering. pub(crate) async fn import_single_block_metered>( import_handle: &mut impl BlockImport, @@ -255,38 +298,11 @@ pub(crate) async fn import_single_block_metered>( let hash = block.hash; let parent_hash = *header.parent_hash(); - let import_handler = |import| match import { - Ok(ImportResult::AlreadyInChain) => { - trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash); - Ok(BlockImportStatus::ImportedKnown(number, peer)) - }, - Ok(ImportResult::Imported(aux)) => - Ok(BlockImportStatus::ImportedUnknown(number, aux, peer)), - Ok(ImportResult::MissingState) => { - debug!( - target: LOG_TARGET, - "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash - ); - Err(BlockImportError::MissingState) - }, - Ok(ImportResult::UnknownParent) => { - debug!( - target: LOG_TARGET, - "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash - ); - Err(BlockImportError::UnknownParent) - }, - Ok(ImportResult::KnownBad) => { - debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash); - Err(BlockImportError::BadBlock(peer)) - }, - Err(e) => { - debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e); - Err(BlockImportError::Other(e)) - }, - }; - - match import_handler( + match import_handler::( + number, + hash, + parent_hash, + peer, import_handle .check_block(BlockCheckParams { hash, @@ -347,5 +363,5 @@ pub(crate) async fn import_single_block_metered>( if let Some(metrics) = metrics.as_ref() { metrics.report_verification_and_import(started.elapsed()); } - import_handler(imported) + import_handler::(number, hash, parent_hash, peer, imported) } From 7bffa1f36f494de068e7221031ceceb49ad318ad Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 17:41:34 +0300 Subject: [PATCH 5/9] Extract `verify_single_block_metered` function out of `import_single_block_metered` such that they can be used independently --- .../consensus/common/src/import_queue.rs | 73 +++++++++++++++---- .../common/src/import_queue/basic_queue.rs | 23 ++++-- 2 files changed, 75 insertions(+), 21 deletions(-) diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 7d2c0e6182a2..8c7e9e8ac07b 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -28,7 +28,10 @@ //! queues to be instantiated simply. use log::{debug, trace}; -use std::fmt; +use std::{ + fmt, + time::{Duration, Instant}, +}; use sp_consensus::{error::Error as ConsensusError, BlockOrigin}; use sp_runtime::{ @@ -225,7 +228,11 @@ pub async fn import_single_block>( block: IncomingBlock, verifier: &mut V, ) -> BlockImportResult { - import_single_block_metered(import_handle, block_origin, block, verifier, None).await + match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? { + SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), + SingleBlockVerificationOutcome::Verified(import_parameters) => + import_single_block_metered(import_handle, import_parameters, None).await, + } } fn import_handler( @@ -270,14 +277,28 @@ where } } +pub(crate) enum SingleBlockVerificationOutcome { + /// Block is already imported. + Imported(BlockImportStatus>), + /// Block is verified, but needs to be imported. + Verified(SingleBlockImportParameters), +} + +pub(crate) struct SingleBlockImportParameters { + import_block: BlockImportParams, + hash: Block::Hash, + block_origin: Option, + verification_time: Duration, +} + /// Single block import function with metering. -pub(crate) async fn import_single_block_metered>( +pub(crate) async fn verify_single_block_metered>( import_handle: &mut impl BlockImport, block_origin: BlockOrigin, block: IncomingBlock, verifier: &mut V, - metrics: Option, -) -> BlockImportResult { + metrics: Option<&Metrics>, +) -> Result, BlockImportError> { let peer = block.origin; let (header, justifications) = match (block.header, block.justifications) { @@ -315,10 +336,13 @@ pub(crate) async fn import_single_block_metered>( .await, )? { BlockImportStatus::ImportedUnknown { .. } => (), - r => return Ok(r), // Any other successful result means that the block is already imported. + r => { + // Any other successful result means that the block is already imported. + return Ok(SingleBlockVerificationOutcome::Imported(r)) + }, } - let started = std::time::Instant::now(); + let started = Instant::now(); let mut import_block = BlockImportParams::new(block_origin, header); import_block.body = block.body; @@ -349,19 +373,42 @@ pub(crate) async fn import_single_block_metered>( } else { trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg); } - if let Some(metrics) = metrics.as_ref() { + if let Some(metrics) = metrics { metrics.report_verification(false, started.elapsed()); } BlockImportError::VerificationFailed(peer, msg) })?; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification(true, started.elapsed()); + let verification_time = started.elapsed(); + if let Some(metrics) = metrics { + metrics.report_verification(true, verification_time); } + Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters { + import_block, + hash, + block_origin: peer, + verification_time, + })) +} + +pub(crate) async fn import_single_block_metered( + import_handle: &mut impl BlockImport, + import_parameters: SingleBlockImportParameters, + metrics: Option<&Metrics>, +) -> BlockImportResult { + let started = Instant::now(); + + let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } = + import_parameters; + + let number = *import_block.header.number(); + let parent_hash = *import_block.header.parent_hash(); + let imported = import_handle.import_block(import_block).await; - if let Some(metrics) = metrics.as_ref() { - metrics.report_verification_and_import(started.elapsed()); + if let Some(metrics) = metrics { + metrics.report_verification_and_import(started.elapsed() + verification_time); } - import_handler::(number, hash, parent_hash, peer, imported) + + import_handler::(number, hash, parent_hash, block_origin, imported) } diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 7cc924529dbd..6bbc5b4a8de6 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -32,9 +32,9 @@ use std::pin::Pin; use crate::{ import_queue::{ buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender}, - import_single_block_metered, BlockImportError, BlockImportStatus, BoxBlockImport, - BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, Link, - RuntimeOrigin, Verifier, LOG_TARGET, + import_single_block_metered, verify_single_block_metered, BlockImportError, + BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService, + IncomingBlock, Link, RuntimeOrigin, SingleBlockVerificationOutcome, Verifier, LOG_TARGET, }, metrics::Metrics, }; @@ -419,15 +419,22 @@ async fn import_many_blocks>( let import_result = if has_error { Err(BlockImportError::Cancelled) } else { - // The actual import. - import_single_block_metered( + let verification_fut = verify_single_block_metered( import_handle, blocks_origin, block, verifier, - metrics.clone(), - ) - .await + metrics.as_ref(), + ); + match verification_fut.await { + Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => { + // The actual import. + import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) + .await + }, + Err(e) => Err(e), + } }; if let Some(metrics) = metrics.as_ref() { From aa6172d6d628ccf36dabfa5ba6b1489d82f4ec76 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 19:11:34 +0300 Subject: [PATCH 6/9] Block checking shouldn't require `&mut self` --- cumulus/client/consensus/common/src/lib.rs | 4 ++-- substrate/client/consensus/babe/src/lib.rs | 2 +- substrate/client/consensus/babe/src/tests.rs | 8 ++++---- substrate/client/consensus/beefy/src/import.rs | 2 +- .../client/consensus/common/src/block_import.rs | 15 +++------------ .../client/consensus/common/src/import_queue.rs | 2 +- .../common/src/import_queue/basic_queue.rs | 2 +- substrate/client/consensus/grandpa/src/import.rs | 4 ++-- substrate/client/consensus/pow/src/lib.rs | 5 +---- substrate/client/network/test/src/lib.rs | 4 ++-- substrate/client/service/src/client/client.rs | 6 +++--- 11 files changed, 21 insertions(+), 33 deletions(-) diff --git a/cumulus/client/consensus/common/src/lib.rs b/cumulus/client/consensus/common/src/lib.rs index 08bceabb2bd4..ea788fb219e0 100644 --- a/cumulus/client/consensus/common/src/lib.rs +++ b/cumulus/client/consensus/common/src/lib.rs @@ -155,13 +155,13 @@ impl Clone for ParachainBlockImport { impl BlockImport for ParachainBlockImport where Block: BlockT, - BI: BlockImport + Send, + BI: BlockImport + Send + Sync, BE: Backend, { type Error = BI::Error; async fn check_block( - &mut self, + &self, block: sc_consensus::BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 90b7523ec18b..15c1e89a516d 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1683,7 +1683,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await.map_err(Into::into) diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs index b3843f8acfa0..e537764caeeb 100644 --- a/substrate/client/consensus/babe/src/tests.rs +++ b/substrate/client/consensus/babe/src/tests.rs @@ -138,11 +138,11 @@ thread_local! { pub struct PanickingBlockImport(B); #[async_trait::async_trait] -impl> BlockImport for PanickingBlockImport +impl BlockImport for PanickingBlockImport where - B: Send, + BI: BlockImport + Send + Sync, { - type Error = B::Error; + type Error = BI::Error; async fn import_block( &mut self, @@ -152,7 +152,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { Ok(self.0.check_block(block).await.expect("checking block failed")) diff --git a/substrate/client/consensus/beefy/src/import.rs b/substrate/client/consensus/beefy/src/import.rs index 5b2abb20aced..dd658c7d40f0 100644 --- a/substrate/client/consensus/beefy/src/import.rs +++ b/substrate/client/consensus/beefy/src/import.rs @@ -175,7 +175,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/consensus/common/src/block_import.rs b/substrate/client/consensus/common/src/block_import.rs index a451692ad478..f866be266ed2 100644 --- a/substrate/client/consensus/common/src/block_import.rs +++ b/substrate/client/consensus/common/src/block_import.rs @@ -307,10 +307,7 @@ pub trait BlockImport { type Error: std::error::Error + Send + 'static; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result; + async fn check_block(&self, block: BlockCheckParams) -> Result; /// Import a block. async fn import_block( @@ -324,10 +321,7 @@ impl BlockImport for crate::import_queue::BoxBlockImport { type Error = sp_consensus::error::Error; /// Check block preconditions. - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { (**self).check_block(block).await } @@ -348,10 +342,7 @@ where { type Error = E; - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { (&**self).check_block(block).await } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 8c7e9e8ac07b..590eb03df0a9 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -293,7 +293,7 @@ pub(crate) struct SingleBlockImportParameters { /// Single block import function with metering. pub(crate) async fn verify_single_block_metered>( - import_handle: &mut impl BlockImport, + import_handle: &impl BlockImport, block_origin: BlockOrigin, block: IncomingBlock, verifier: &mut V, diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 6bbc5b4a8de6..ad2c0431dff9 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -513,7 +513,7 @@ mod tests { type Error = sp_consensus::Error; async fn check_block( - &mut self, + &self, _block: BlockCheckParams, ) -> Result { Ok(ImportResult::imported(false)) diff --git a/substrate/client/consensus/grandpa/src/import.rs b/substrate/client/consensus/grandpa/src/import.rs index ca5b7c400bfb..a88681b1c664 100644 --- a/substrate/client/consensus/grandpa/src/import.rs +++ b/substrate/client/consensus/grandpa/src/import.rs @@ -515,7 +515,7 @@ where Client: ClientForGrandpa, Client::Api: GrandpaApi, for<'a> &'a Client: BlockImport, - SC: Send, + SC: Send + Sync, { type Error = ConsensusError; @@ -694,7 +694,7 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index ee5c1dfc6f11..93db31dacc17 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -312,10 +312,7 @@ where { type Error = ConsensusError; - async fn check_block( - &mut self, - block: BlockCheckParams, - ) -> Result { + async fn check_block(&self, block: BlockCheckParams) -> Result { self.inner.check_block(block).await.map_err(Into::into) } diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 463624690a5f..85dd390ead87 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -211,7 +211,7 @@ impl BlockImport for PeersClient { type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.client.check_block(block).await @@ -587,7 +587,7 @@ where type Error = ConsensusError; async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { self.inner.check_block(block).await diff --git a/substrate/client/service/src/client/client.rs b/substrate/client/service/src/client/client.rs index 09c1673884aa..46d3766ba98c 100644 --- a/substrate/client/service/src/client/client.rs +++ b/substrate/client/service/src/client/client.rs @@ -1781,7 +1781,7 @@ where /// Check block preconditions. async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { let BlockCheckParams { @@ -1863,10 +1863,10 @@ where } async fn check_block( - &mut self, + &self, block: BlockCheckParams, ) -> Result { - (&*self).check_block(block).await + (&self).check_block(block).await } } From 7aae861a467af19746aecd6a750a90022c8916b2 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 19:58:07 +0300 Subject: [PATCH 7/9] Block verification shouldn't require `&self`, but this is not supported fully by some consensus engines, so introduce `Verifier::supports_stateless_verification()` that can be used to check this --- Cargo.lock | 1 + cumulus/client/consensus/aura/Cargo.toml | 1 + .../aura/src/equivocation_import_queue.rs | 9 +++++---- .../client/consensus/common/src/import_queue.rs | 2 +- .../consensus/relay-chain/src/import_queue.rs | 2 +- cumulus/polkadot-parachain/src/service.rs | 8 ++++---- .../client/consensus/aura/src/import_queue.rs | 2 +- substrate/client/consensus/babe/src/lib.rs | 2 +- substrate/client/consensus/babe/src/tests.rs | 2 +- .../client/consensus/common/src/import_queue.rs | 17 ++++++++++++++--- .../common/src/import_queue/basic_queue.rs | 16 +++++++++++----- .../consensus/manual-seal/src/consensus/babe.rs | 2 +- .../client/consensus/manual-seal/src/lib.rs | 2 +- substrate/client/consensus/pow/src/lib.rs | 2 +- substrate/client/network/test/src/lib.rs | 7 ++----- substrate/client/network/test/src/service.rs | 2 +- 16 files changed, 47 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index cb4e085218e5..218e12d14624 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3311,6 +3311,7 @@ dependencies = [ "cumulus-relay-chain-interface", "futures", "parity-scale-codec", + "parking_lot 0.12.1", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-overseer", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 8239a498746e..55482206c0d7 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true async-trait = "0.1.73" codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] } futures = "0.3.28" +parking_lot = "0.12.1" tracing = "0.1.37" schnellru = "0.2.1" diff --git a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs index 5cd65ed5546b..a3a6f09ec545 100644 --- a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs +++ b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs @@ -21,6 +21,7 @@ /// should be thrown out and which ones should be kept. use codec::Codec; use cumulus_client_consensus_common::ParachainBlockImportMarker; +use parking_lot::Mutex; use schnellru::{ByLength, LruMap}; use sc_consensus::{ @@ -71,7 +72,7 @@ struct Verifier { client: Arc, create_inherent_data_providers: CIDP, slot_duration: SlotDuration, - defender: NaiveEquivocationDefender, + defender: Mutex, telemetry: Option, _phantom: std::marker::PhantomData (Block, P)>, } @@ -89,7 +90,7 @@ where CIDP: CreateInherentDataProviders, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so, or when importing only state. @@ -132,7 +133,7 @@ where block_params.post_hash = Some(post_hash); // Check for and reject egregious amounts of equivocations. - if self.defender.insert_and_check(slot) { + if self.defender.lock().insert_and_check(slot) { return Err(format!( "Rejecting block {:?} due to excessive equivocations at slot", post_hash, @@ -239,7 +240,7 @@ where let verifier = Verifier:: { client, create_inherent_data_providers, - defender: NaiveEquivocationDefender::default(), + defender: Mutex::new(NaiveEquivocationDefender::default()), slot_duration, telemetry, _phantom: std::marker::PhantomData, diff --git a/cumulus/client/consensus/common/src/import_queue.rs b/cumulus/client/consensus/common/src/import_queue.rs index 311a2b7ad8cf..a8d0785f8562 100644 --- a/cumulus/client/consensus/common/src/import_queue.rs +++ b/cumulus/client/consensus/common/src/import_queue.rs @@ -50,7 +50,7 @@ pub struct VerifyNothing; #[async_trait::async_trait] impl Verifier for VerifyNothing { async fn verify( - &mut self, + &self, params: BlockImportParams, ) -> Result, String> { Ok(params) diff --git a/cumulus/client/consensus/relay-chain/src/import_queue.rs b/cumulus/client/consensus/relay-chain/src/import_queue.rs index 9ee03b95904c..ea69c173af5d 100644 --- a/cumulus/client/consensus/relay-chain/src/import_queue.rs +++ b/cumulus/client/consensus/relay-chain/src/import_queue.rs @@ -52,7 +52,7 @@ where CIDP: CreateInherentDataProviders, { async fn verify( - &mut self, + &self, mut block_params: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so, or when importing only state. diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index f7b053b4b6a9..78f4ec3b5eb8 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -1022,7 +1022,7 @@ where struct Verifier { client: Arc, - aura_verifier: BuildOnAccess>>, + aura_verifier: Mutex>>>, relay_chain_verifier: Box>, _phantom: PhantomData, } @@ -1035,7 +1035,7 @@ where AuraId: Send + Sync + Codec, { async fn verify( - &mut self, + &self, block_import: BlockImportParams, ) -> Result, String> { if self @@ -1044,7 +1044,7 @@ where .has_api::>(*block_import.header.parent_hash()) .unwrap_or(false) { - self.aura_verifier.get_mut().verify(block_import).await + self.aura_verifier.lock().await.get_mut().verify(block_import).await } else { self.relay_chain_verifier.verify(block_import).await } @@ -1104,7 +1104,7 @@ where let verifier = Verifier { client, relay_chain_verifier, - aura_verifier: BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier))), + aura_verifier: Mutex::new(BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier)))), _phantom: PhantomData, }; diff --git a/substrate/client/consensus/aura/src/import_queue.rs b/substrate/client/consensus/aura/src/import_queue.rs index a8777ef8788c..79f4faa5ebf9 100644 --- a/substrate/client/consensus/aura/src/import_queue.rs +++ b/substrate/client/consensus/aura/src/import_queue.rs @@ -174,7 +174,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { // Skip checks that include execution, if being told so or when importing only state. diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 15c1e89a516d..8f65bf79f16f 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -1130,7 +1130,7 @@ where CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { trace!( diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs index e537764caeeb..8e54537cce34 100644 --- a/substrate/client/consensus/babe/src/tests.rs +++ b/substrate/client/consensus/babe/src/tests.rs @@ -193,7 +193,7 @@ impl Verifier for TestVerifier { /// new set of validators to import. If not, err with an Error-Message /// presented to the User in the logs. async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { // apply post-sealing mutations (i.e. stripping seal, if desired). diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 590eb03df0a9..265ab4ebf1cb 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -97,11 +97,22 @@ pub struct IncomingBlock { /// Verify a justification of a block #[async_trait::async_trait] -pub trait Verifier: Send { +pub trait Verifier: Send + Sync { + /// Whether verifier supports stateless verification. + /// + /// Stateless verification means that verification on blocks can be done in arbitrary order, + /// doesn't expect parent block to be imported first, etc. + /// + /// Verifiers that support stateless verification can verify multiple blocks concurrently, + /// significantly improving sync speed. + fn supports_stateless_verification(&self) -> bool { + // Unless re-defined by verifier is assumed to not support stateless verification. + false + } + /// Verify the given block data and return the `BlockImportParams` to /// continue the block import process. - async fn verify(&mut self, block: BlockImportParams) - -> Result, String>; + async fn verify(&self, block: BlockImportParams) -> Result, String>; } /// Blocks import queue API. diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index ad2c0431dff9..8de9ede61298 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -60,13 +60,16 @@ impl BasicQueue { /// Instantiate a new basic queue, with given verifier. /// /// This creates a background task, and calls `on_start` on the justification importer. - pub fn new>( + pub fn new( verifier: V, block_import: BoxBlockImport, justification_import: Option>, spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, - ) -> Self { + ) -> Self + where + V: Verifier + 'static, + { let (result_sender, result_port) = buffered_link::buffered_link(100_000); let metrics = prometheus_registry.and_then(|r| { @@ -252,7 +255,7 @@ struct BlockImportWorker { } impl BlockImportWorker { - fn new>( + fn new( result_sender: BufferedLinkSender, verifier: V, block_import: BoxBlockImport, @@ -262,7 +265,10 @@ impl BlockImportWorker { impl Future + Send, TracingUnboundedSender>, TracingUnboundedSender>, - ) { + ) + where + V: Verifier + 'static, + { use worker_messages::*; let (justification_sender, mut justification_port) = @@ -501,7 +507,7 @@ mod tests { #[async_trait::async_trait] impl Verifier for () { async fn verify( - &mut self, + &self, block: BlockImportParams, ) -> Result, String> { Ok(BlockImportParams::new(block.origin, block.header)) diff --git a/substrate/client/consensus/manual-seal/src/consensus/babe.rs b/substrate/client/consensus/manual-seal/src/consensus/babe.rs index 26fa81459808..647e2f1903d8 100644 --- a/substrate/client/consensus/manual-seal/src/consensus/babe.rs +++ b/substrate/client/consensus/manual-seal/src/consensus/babe.rs @@ -96,7 +96,7 @@ where C: HeaderBackend + HeaderMetadata, { async fn verify( - &mut self, + &self, mut import_params: BlockImportParams, ) -> Result, String> { import_params.finalized = false; diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 1e5db966e66d..52ccd1c59520 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -65,7 +65,7 @@ struct ManualSealVerifier; #[async_trait::async_trait] impl Verifier for ManualSealVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { block.finalized = false; diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index 93db31dacc17..50e9533abb36 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -439,7 +439,7 @@ where Algorithm::Difficulty: 'static + Send, { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { let hash = block.header.hash(); diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index 85dd390ead87..c105b925a2c4 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -115,7 +115,7 @@ impl PassThroughVerifier { #[async_trait::async_trait] impl Verifier for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: BlockImportParams, ) -> Result, String> { if block.fork_choice.is_none() { @@ -609,10 +609,7 @@ struct VerifierAdapter { #[async_trait::async_trait] impl Verifier for VerifierAdapter { - async fn verify( - &mut self, - block: BlockImportParams, - ) -> Result, String> { + async fn verify(&self, block: BlockImportParams) -> Result, String> { let hash = block.header.hash(); self.verifier.lock().await.verify(block).await.map_err(|e| { self.failed_verifications.lock().insert(hash, e.clone()); diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index baa562c46dfa..90da54eb272e 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -134,7 +134,7 @@ impl TestNetworkBuilder { #[async_trait::async_trait] impl sc_consensus::Verifier for PassThroughVerifier { async fn verify( - &mut self, + &self, mut block: sc_consensus::BlockImportParams, ) -> Result, String> { block.finalized = self.0; From e8559234e80bb5dd339b7a5a73065adbab406e07 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 21:52:23 +0300 Subject: [PATCH 8/9] Refactor `BoxBlockImport` type alias into sharable `SharedBlockImport` struct --- Cargo.lock | 1 + .../aura/src/equivocation_import_queue.rs | 4 +- .../consensus/common/src/import_queue.rs | 3 +- .../consensus/relay-chain/src/import_queue.rs | 4 +- cumulus/polkadot-parachain/src/service.rs | 4 +- .../client/consensus/aura/src/import_queue.rs | 9 +- substrate/client/consensus/babe/src/lib.rs | 9 +- substrate/client/consensus/babe/src/tests.rs | 29 +++--- substrate/client/consensus/common/Cargo.toml | 1 + .../consensus/common/src/block_import.rs | 6 +- .../consensus/common/src/import_queue.rs | 93 ++++++++++++++++--- .../common/src/import_queue/basic_queue.rs | 22 +++-- substrate/client/consensus/common/src/lib.rs | 4 +- .../client/consensus/manual-seal/src/lib.rs | 4 +- substrate/client/consensus/pow/src/lib.rs | 8 +- substrate/client/consensus/pow/src/worker.rs | 13 +-- .../client/network/test/src/block_import.rs | 4 +- substrate/client/network/test/src/lib.rs | 4 +- substrate/client/network/test/src/service.rs | 4 +- 19 files changed, 156 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 218e12d14624..e68d834ea753 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14690,6 +14690,7 @@ dependencies = [ name = "sc-consensus" version = "0.10.0-dev" dependencies = [ + "async-lock", "async-trait", "futures", "futures-timer", diff --git a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs index a3a6f09ec545..38ceec30b57d 100644 --- a/cumulus/client/consensus/aura/src/equivocation_import_queue.rs +++ b/cumulus/client/consensus/aura/src/equivocation_import_queue.rs @@ -26,7 +26,7 @@ use schnellru::{ByLength, LruMap}; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, - BlockImport, BlockImportParams, ForkChoiceStrategy, + BlockImport, BlockImportParams, ForkChoiceStrategy, SharedBlockImport, }; use sc_consensus_aura::standalone as aura_internal; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE}; @@ -246,5 +246,5 @@ where _phantom: std::marker::PhantomData, }; - BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry) + BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry) } diff --git a/cumulus/client/consensus/common/src/import_queue.rs b/cumulus/client/consensus/common/src/import_queue.rs index a8d0785f8562..e815353b77bb 100644 --- a/cumulus/client/consensus/common/src/import_queue.rs +++ b/cumulus/client/consensus/common/src/import_queue.rs @@ -38,6 +38,7 @@ use sp_runtime::traits::Block as BlockT; use sc_consensus::{ block_import::{BlockImport, BlockImportParams}, import_queue::{BasicQueue, Verifier}, + SharedBlockImport, }; use crate::ParachainBlockImportMarker; @@ -72,5 +73,5 @@ where + Sync + 'static, { - BasicQueue::new(VerifyNothing, Box::new(block_import), None, spawner, registry) + BasicQueue::new(VerifyNothing, SharedBlockImport::new(block_import), None, spawner, registry) } diff --git a/cumulus/client/consensus/relay-chain/src/import_queue.rs b/cumulus/client/consensus/relay-chain/src/import_queue.rs index ea69c173af5d..16013ff2a1d2 100644 --- a/cumulus/client/consensus/relay-chain/src/import_queue.rs +++ b/cumulus/client/consensus/relay-chain/src/import_queue.rs @@ -20,7 +20,7 @@ use cumulus_client_consensus_common::ParachainBlockImportMarker; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, - BlockImport, BlockImportParams, + BlockImport, BlockImportParams, SharedBlockImport, }; use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder as BlockBuilderApi; @@ -125,5 +125,5 @@ where { let verifier = Verifier::new(client, create_inherent_data_providers); - Ok(BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)) + Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry)) } diff --git a/cumulus/polkadot-parachain/src/service.rs b/cumulus/polkadot-parachain/src/service.rs index 78f4ec3b5eb8..cef6f6f32041 100644 --- a/cumulus/polkadot-parachain/src/service.rs +++ b/cumulus/polkadot-parachain/src/service.rs @@ -46,7 +46,7 @@ use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; use futures::lock::Mutex; use sc_consensus::{ import_queue::{BasicQueue, Verifier as VerifierT}, - BlockImportParams, ImportQueue, + BlockImportParams, ImportQueue, SharedBlockImport, }; use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY}; use sc_network::{config::FullNetworkConfiguration, NetworkBlock}; @@ -1111,7 +1111,7 @@ where let registry = config.prometheus_registry(); let spawner = task_manager.spawn_essential_handle(); - Ok(BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry)) + Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, &spawner, registry)) } /// Start an aura powered parachain node. Asset Hub and Collectives use this. diff --git a/substrate/client/consensus/aura/src/import_queue.rs b/substrate/client/consensus/aura/src/import_queue.rs index 79f4faa5ebf9..1239621015f9 100644 --- a/substrate/client/consensus/aura/src/import_queue.rs +++ b/substrate/client/consensus/aura/src/import_queue.rs @@ -29,6 +29,7 @@ use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider}; use sc_consensus::{ block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy}, import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier}, + SharedBlockImport, }; use sc_consensus_slots::{check_equivocation, CheckedHeader, InherentDataProviderExt}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE}; @@ -376,7 +377,13 @@ where compatibility_mode, }); - Ok(BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry)) + Ok(BasicQueue::new( + verifier, + SharedBlockImport::new(block_import), + justification_import, + spawner, + registry, + )) } /// Parameters of [`build_verifier`]. diff --git a/substrate/client/consensus/babe/src/lib.rs b/substrate/client/consensus/babe/src/lib.rs index 8f65bf79f16f..b80fe72a592c 100644 --- a/substrate/client/consensus/babe/src/lib.rs +++ b/substrate/client/consensus/babe/src/lib.rs @@ -97,6 +97,7 @@ use sc_consensus::{ StateAction, }, import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier}, + SharedBlockImport, }; use sc_consensus_epochs::{ descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpochDescriptor, @@ -1856,7 +1857,13 @@ where spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed()); Ok(( - BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry), + BasicQueue::new( + verifier, + SharedBlockImport::new(block_import), + justification_import, + spawner, + registry, + ), BabeWorkerHandle(worker_tx), )) } diff --git a/substrate/client/consensus/babe/src/tests.rs b/substrate/client/consensus/babe/src/tests.rs index 8e54537cce34..0ce7209cf754 100644 --- a/substrate/client/consensus/babe/src/tests.rs +++ b/substrate/client/consensus/babe/src/tests.rs @@ -22,7 +22,7 @@ use super::*; use authorship::claim_slot; use sc_block_builder::{BlockBuilder, BlockBuilderProvider}; use sc_client_api::{BlockchainEvents, Finalizer}; -use sc_consensus::{BoxBlockImport, BoxJustificationImport}; +use sc_consensus::{BoxJustificationImport, SharedBlockImport}; use sc_consensus_epochs::{EpochIdentifier, EpochIdentifierPosition}; use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging; use sc_network_test::{Block as TestBlock, *}; @@ -204,7 +204,7 @@ impl Verifier for TestVerifier { pub struct PeerData { link: BabeLink, - block_import: Mutex>>, + block_import: SharedBlockImport, } impl TestNetFactory for BabeTestNet { @@ -228,8 +228,7 @@ impl TestNetFactory for BabeTestNet { let block_import = PanickingBlockImport(block_import); - let data_block_import = - Mutex::new(Some(Box::new(block_import.clone()) as BoxBlockImport<_>)); + let data_block_import = SharedBlockImport::new(block_import.clone()); ( BlockImportAdapter::new(block_import), None, @@ -370,7 +369,7 @@ async fn run_one_test(mutator: impl Fn(&mut TestHeader, Stage) + Send + Sync + ' let client_clone = client.clone(); babe_futures.push( start_babe(BabeParams { - block_import: data.block_import.lock().take().expect("import set up during init"), + block_import: data.block_import.clone(), select_chain, client, env: environ, @@ -614,7 +613,7 @@ async fn propose_and_import_block( parent: &TestHeader, slot: Option, proposer_factory: &mut DummyFactory, - block_import: &mut BoxBlockImport, + block_import: &mut SharedBlockImport, ) -> Hash { let mut proposer = proposer_factory.init(parent).await.unwrap(); @@ -684,7 +683,7 @@ async fn propose_and_import_block( async fn propose_and_import_blocks( client: &PeersFullClient, proposer_factory: &mut DummyFactory, - block_import: &mut BoxBlockImport, + block_import: &mut SharedBlockImport, parent_hash: Hash, n: usize, ) -> Vec { @@ -715,7 +714,7 @@ async fn importing_block_one_sets_genesis_epoch() { mutator: Arc::new(|_, _| ()), }; - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let genesis_header = client.header(client.chain_info().genesis_hash).unwrap().unwrap(); @@ -749,7 +748,7 @@ async fn revert_prunes_epoch_changes_and_removes_weights() { let client = peer.client().as_client(); let backend = peer.client().as_backend(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let epoch_changes = data.link.epoch_changes.clone(); let mut proposer_factory = DummyFactory { @@ -837,7 +836,7 @@ async fn revert_not_allowed_for_finalized() { let client = peer.client().as_client(); let backend = peer.client().as_backend(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), @@ -876,7 +875,7 @@ async fn importing_epoch_change_block_prunes_tree() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let epoch_changes = data.link.epoch_changes.clone(); let mut proposer_factory = DummyFactory { @@ -975,7 +974,7 @@ async fn verify_slots_are_strictly_increasing() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), @@ -1022,7 +1021,7 @@ async fn obsolete_blocks_aux_data_cleanup() { mutator: Arc::new(|_, _| ()), }; - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let aux_data_check = |hashes: &[Hash], expected: bool| { hashes.iter().all(|hash| { @@ -1099,7 +1098,7 @@ async fn allows_skipping_epochs() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), @@ -1228,7 +1227,7 @@ async fn allows_skipping_epochs_on_some_forks() { let data = peer.data.as_ref().expect("babe link set up during initialization"); let client = peer.client().as_client(); - let mut block_import = data.block_import.lock().take().expect("import set up during init"); + let mut block_import = data.block_import.clone(); let mut proposer_factory = DummyFactory { client: client.clone(), diff --git a/substrate/client/consensus/common/Cargo.toml b/substrate/client/consensus/common/Cargo.toml index f269e3752d43..902f3034b202 100644 --- a/substrate/client/consensus/common/Cargo.toml +++ b/substrate/client/consensus/common/Cargo.toml @@ -13,6 +13,7 @@ readme = "README.md" targets = ["x86_64-unknown-linux-gnu"] [dependencies] +async-lock = "2.8.0" async-trait = "0.1.57" futures = { version = "0.3.21", features = ["thread-pool"] } futures-timer = "3.0.1" diff --git a/substrate/client/consensus/common/src/block_import.rs b/substrate/client/consensus/common/src/block_import.rs index f866be266ed2..617d5b07e3d6 100644 --- a/substrate/client/consensus/common/src/block_import.rs +++ b/substrate/client/consensus/common/src/block_import.rs @@ -317,12 +317,12 @@ pub trait BlockImport { } #[async_trait::async_trait] -impl BlockImport for crate::import_queue::BoxBlockImport { +impl BlockImport for crate::import_queue::SharedBlockImport { type Error = sp_consensus::error::Error; /// Check block preconditions. async fn check_block(&self, block: BlockCheckParams) -> Result { - (**self).check_block(block).await + self.read().await.check_block(block).await } /// Import a block. @@ -330,7 +330,7 @@ impl BlockImport for crate::import_queue::BoxBlockImport { &mut self, block: BlockImportParams, ) -> Result { - (**self).import_block(block).await + self.write().await.import_block(block).await } } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 265ab4ebf1cb..2e8455990a32 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -27,9 +27,14 @@ //! instantiated. The `BasicQueue` and `BasicVerifier` traits allow serial //! queues to be instantiated simply. +use async_lock::RwLock; use log::{debug, trace}; use std::{ fmt, + future::Future, + ops::Deref, + pin::Pin, + sync::Arc, time::{Duration, Instant}, }; @@ -61,7 +66,33 @@ pub mod buffered_link; pub mod mock; /// Shared block import struct used by the queue. -pub type BoxBlockImport = Box + Send + Sync>; +pub struct SharedBlockImport( + Arc + Send + Sync>>, +); + +impl Clone for SharedBlockImport { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl Deref for SharedBlockImport { + type Target = RwLock + Send + Sync>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl SharedBlockImport { + /// New instance + pub fn new(block_import: BI) -> Self + where + BI: BlockImport + Send + Sync + 'static, + { + Self(Arc::new(RwLock::new(block_import))) + } +} /// Shared justification import struct used by the queue. pub type BoxJustificationImport = @@ -115,6 +146,26 @@ pub trait Verifier: Send + Sync { async fn verify(&self, block: BlockImportParams) -> Result, String>; } +impl Verifier for Arc> +where + Block: BlockT, +{ + fn supports_stateless_verification(&self) -> bool { + (**self).supports_stateless_verification() + } + + fn verify<'life0, 'async_trait>( + &'life0 self, + block: BlockImportParams, + ) -> Pin, String>> + Send + 'async_trait>> + where + 'life0: 'async_trait, + Self: 'async_trait, + { + (**self).verify(block) + } +} + /// Blocks import queue API. /// /// The `import_*` methods can be called in order to send elements for the import queue to verify. @@ -233,12 +284,16 @@ pub enum BlockImportError { type BlockImportResult = Result>, BlockImportError>; /// Single block import function. -pub async fn import_single_block>( - import_handle: &mut impl BlockImport, +pub async fn import_single_block( + import_handle: &mut BI, block_origin: BlockOrigin, - block: IncomingBlock, - verifier: &mut V, -) -> BlockImportResult { + block: IncomingBlock, + verifier: &dyn Verifier, +) -> BlockImportResult +where + Block: BlockT, + BI: BlockImport, +{ match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? { SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), SingleBlockVerificationOutcome::Verified(import_parameters) => @@ -303,13 +358,17 @@ pub(crate) struct SingleBlockImportParameters { } /// Single block import function with metering. -pub(crate) async fn verify_single_block_metered>( - import_handle: &impl BlockImport, +pub(crate) async fn verify_single_block_metered( + import_handle: &BI, block_origin: BlockOrigin, - block: IncomingBlock, - verifier: &mut V, + block: IncomingBlock, + verifier: &dyn Verifier, metrics: Option<&Metrics>, -) -> Result, BlockImportError> { +) -> Result, BlockImportError> +where + Block: BlockT, + BI: BlockImport, +{ let peer = block.origin; let (header, justifications) = match (block.header, block.justifications) { @@ -330,7 +389,7 @@ pub(crate) async fn verify_single_block_metered>( let hash = block.hash; let parent_hash = *header.parent_hash(); - match import_handler::( + match import_handler::( number, hash, parent_hash, @@ -403,11 +462,15 @@ pub(crate) async fn verify_single_block_metered>( })) } -pub(crate) async fn import_single_block_metered( - import_handle: &mut impl BlockImport, +pub(crate) async fn import_single_block_metered( + import_handle: &mut BI, import_parameters: SingleBlockImportParameters, metrics: Option<&Metrics>, -) -> BlockImportResult { +) -> BlockImportResult +where + Block: BlockT, + BI: BlockImport, +{ let started = Instant::now(); let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } = diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 8de9ede61298..87ab26db6c91 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -33,8 +33,9 @@ use crate::{ import_queue::{ buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender}, import_single_block_metered, verify_single_block_metered, BlockImportError, - BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService, - IncomingBlock, Link, RuntimeOrigin, SingleBlockVerificationOutcome, Verifier, LOG_TARGET, + BlockImportStatus, BoxJustificationImport, ImportQueue, ImportQueueService, IncomingBlock, + Link, RuntimeOrigin, SharedBlockImport, SingleBlockVerificationOutcome, Verifier, + LOG_TARGET, }, metrics::Metrics, }; @@ -62,7 +63,7 @@ impl BasicQueue { /// This creates a background task, and calls `on_start` on the justification importer. pub fn new( verifier: V, - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_import: Option>, spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, @@ -221,7 +222,7 @@ mod worker_messages { /// /// Returns when `block_import` ended. async fn block_import_process( - mut block_import: BoxBlockImport, + mut block_import: SharedBlockImport, mut verifier: impl Verifier, mut result_sender: BufferedLinkSender, mut block_import_receiver: TracingUnboundedReceiver>, @@ -258,7 +259,7 @@ impl BlockImportWorker { fn new( result_sender: BufferedLinkSender, verifier: V, - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_import: Option>, metrics: Option, ) -> ( @@ -385,7 +386,7 @@ struct ImportManyBlocksResult { /// This will yield after each imported block once, to ensure that other futures can /// be called as well. async fn import_many_blocks>( - import_handle: &mut BoxBlockImport, + import_handle: &mut SharedBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, verifier: &mut V, @@ -589,8 +590,13 @@ mod tests { fn prioritizes_finality_work_over_block_import() { let (result_sender, mut result_port) = buffered_link::buffered_link(100_000); - let (worker, finality_sender, block_import_sender) = - BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None); + let (worker, finality_sender, block_import_sender) = BlockImportWorker::new( + result_sender, + (), + SharedBlockImport::new(()), + Some(Box::new(())), + None, + ); futures::pin_mut!(worker); let import_block = |n| { diff --git a/substrate/client/consensus/common/src/lib.rs b/substrate/client/consensus/common/src/lib.rs index 6bf1ed0b48b4..5e1c8f67112a 100644 --- a/substrate/client/consensus/common/src/lib.rs +++ b/substrate/client/consensus/common/src/lib.rs @@ -28,8 +28,8 @@ pub use block_import::{ StorageChanges, }; pub use import_queue::{ - import_single_block, BasicQueue, BlockImportError, BlockImportStatus, BoxBlockImport, - BoxJustificationImport, DefaultImportQueue, ImportQueue, IncomingBlock, Link, Verifier, + import_single_block, BasicQueue, BlockImportError, BlockImportStatus, BoxJustificationImport, + DefaultImportQueue, ImportQueue, IncomingBlock, Link, SharedBlockImport, Verifier, }; mod longest_chain; diff --git a/substrate/client/consensus/manual-seal/src/lib.rs b/substrate/client/consensus/manual-seal/src/lib.rs index 52ccd1c59520..0b75f6c6cf0d 100644 --- a/substrate/client/consensus/manual-seal/src/lib.rs +++ b/substrate/client/consensus/manual-seal/src/lib.rs @@ -28,7 +28,7 @@ use sc_client_api::{ }; use sc_consensus::{ block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy}, - import_queue::{BasicQueue, BoxBlockImport, Verifier}, + import_queue::{BasicQueue, SharedBlockImport, Verifier}, }; use sp_blockchain::HeaderBackend; use sp_consensus::{Environment, Proposer, SelectChain}; @@ -76,7 +76,7 @@ impl Verifier for ManualSealVerifier { /// Instantiate the import queue for the manual seal consensus engine. pub fn import_queue( - block_import: BoxBlockImport, + block_import: SharedBlockImport, spawner: &impl sp_core::traits::SpawnEssentialNamed, registry: Option<&Registry>, ) -> BasicQueue diff --git a/substrate/client/consensus/pow/src/lib.rs b/substrate/client/consensus/pow/src/lib.rs index 50e9533abb36..c816498902d5 100644 --- a/substrate/client/consensus/pow/src/lib.rs +++ b/substrate/client/consensus/pow/src/lib.rs @@ -50,8 +50,8 @@ use log::*; use prometheus_endpoint::Registry; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; use sc_consensus::{ - BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport, - BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier, + BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, + ForkChoiceStrategy, ImportResult, SharedBlockImport, Verifier, }; use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder as BlockBuilderApi; @@ -460,7 +460,7 @@ pub type PowImportQueue = BasicQueue; /// Import queue for PoW engine. pub fn import_queue( - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_import: Option>, algorithm: Algorithm, spawner: &impl sp_core::traits::SpawnEssentialNamed, @@ -486,7 +486,7 @@ where /// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted /// for blocks being built. This can encode authorship information, or just be a graffiti. pub fn start_mining_worker( - block_import: BoxBlockImport, + block_import: SharedBlockImport, client: Arc, select_chain: S, algorithm: Algorithm, diff --git a/substrate/client/consensus/pow/src/worker.rs b/substrate/client/consensus/pow/src/worker.rs index 9e9c4fc137d8..ad3a37a5a9f6 100644 --- a/substrate/client/consensus/pow/src/worker.rs +++ b/substrate/client/consensus/pow/src/worker.rs @@ -24,7 +24,9 @@ use futures_timer::Delay; use log::*; use parking_lot::Mutex; use sc_client_api::ImportNotifications; -use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges}; +use sc_consensus::{ + BlockImport, BlockImportParams, SharedBlockImport, StateAction, StorageChanges, +}; use sp_consensus::{BlockOrigin, Proposal}; use sp_runtime::{ generic::BlockId, @@ -78,7 +80,7 @@ pub struct MiningHandle< algorithm: Arc, justification_sync_link: Arc, build: Arc>>>, - block_import: Arc>>, + block_import: SharedBlockImport, } impl MiningHandle @@ -94,7 +96,7 @@ where pub(crate) fn new( algorithm: Algorithm, - block_import: BoxBlockImport, + block_import: SharedBlockImport, justification_sync_link: L, ) -> Self { Self { @@ -102,7 +104,7 @@ where algorithm: Arc::new(algorithm), justification_sync_link: Arc::new(justification_sync_link), build: Arc::new(Mutex::new(None)), - block_import: Arc::new(Mutex::new(block_import)), + block_import, } } @@ -192,9 +194,8 @@ where import_block.insert_intermediate(INTERMEDIATE_KEY, intermediate); let header = import_block.post_header(); - let mut block_import = self.block_import.lock(); - match block_import.import_block(import_block).await { + match self.block_import.clone().import_block(import_block).await { Ok(res) => { res.handle_justification( &header.hash(), diff --git a/substrate/client/network/test/src/block_import.rs b/substrate/client/network/test/src/block_import.rs index 25e3b9bee87f..08287620ba42 100644 --- a/substrate/client/network/test/src/block_import.rs +++ b/substrate/client/network/test/src/block_import.rs @@ -23,7 +23,7 @@ use futures::executor::block_on; use sc_block_builder::BlockBuilderProvider; use sc_consensus::{ import_single_block, BasicQueue, BlockImportError, BlockImportStatus, ImportedAux, - IncomingBlock, + IncomingBlock, SharedBlockImport, }; use sp_consensus::BlockOrigin; use substrate_test_runtime_client::{ @@ -118,7 +118,7 @@ fn async_import_queue_drops() { let queue = BasicQueue::new( verifier, - Box::new(substrate_test_runtime_client::new()), + SharedBlockImport::new(substrate_test_runtime_client::new()), None, &executor, None, diff --git a/substrate/client/network/test/src/lib.rs b/substrate/client/network/test/src/lib.rs index c105b925a2c4..ae39d424ae39 100644 --- a/substrate/client/network/test/src/lib.rs +++ b/substrate/client/network/test/src/lib.rs @@ -48,7 +48,7 @@ use sc_client_api::{ use sc_consensus::{ BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport, ForkChoiceStrategy, ImportQueue, ImportResult, JustificationImport, JustificationSyncLink, - LongestChain, Verifier, + LongestChain, SharedBlockImport, Verifier, }; use sc_network::{ config::{ @@ -767,7 +767,7 @@ pub trait TestNetFactory: Default + Sized + Send { let import_queue = Box::new(BasicQueue::new( verifier.clone(), - Box::new(block_import.clone()), + SharedBlockImport::new(block_import.clone()), justification_import, &sp_core::testing::TaskExecutor::new(), None, diff --git a/substrate/client/network/test/src/service.rs b/substrate/client/network/test/src/service.rs index 90da54eb272e..7aeaf2a8918e 100644 --- a/substrate/client/network/test/src/service.rs +++ b/substrate/client/network/test/src/service.rs @@ -19,7 +19,7 @@ use futures::prelude::*; use libp2p::{Multiaddr, PeerId}; -use sc_consensus::{ImportQueue, Link}; +use sc_consensus::{ImportQueue, Link, SharedBlockImport}; use sc_network::{ config::{self, FullNetworkConfiguration, MultiaddrWithPeerId, ProtocolId, TransportConfig}, event::Event, @@ -146,7 +146,7 @@ impl TestNetworkBuilder { let mut import_queue = self.import_queue.unwrap_or(Box::new(sc_consensus::BasicQueue::new( PassThroughVerifier(false), - Box::new(client.clone()), + SharedBlockImport::new(client.clone()), None, &sp_core::testing::TaskExecutor::new(), None, From f62d9ccdc5c72bf6892185acfc0ccbe9105fd6a5 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 16 Sep 2023 22:10:45 +0300 Subject: [PATCH 9/9] Support for concurrent/parallel verification using stateless verifiers --- Cargo.lock | 1 + substrate/client/consensus/common/Cargo.toml | 1 + .../consensus/common/src/import_queue.rs | 7 +- .../common/src/import_queue/basic_queue.rs | 142 +++++++++++++++++- 4 files changed, 143 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e68d834ea753..3de3363d19f6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14710,6 +14710,7 @@ dependencies = [ "sp-test-primitives", "substrate-prometheus-endpoint", "thiserror", + "tokio", ] [[package]] diff --git a/substrate/client/consensus/common/Cargo.toml b/substrate/client/consensus/common/Cargo.toml index 902f3034b202..c43da3299d63 100644 --- a/substrate/client/consensus/common/Cargo.toml +++ b/substrate/client/consensus/common/Cargo.toml @@ -32,6 +32,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" } sp-core = { path = "../../../primitives/core" } sp-runtime = { path = "../../../primitives/runtime" } sp-state-machine = { path = "../../../primitives/state-machine" } +tokio = "1.32.0" [dev-dependencies] sp-test-primitives = { path = "../../../primitives/test-primitives" } diff --git a/substrate/client/consensus/common/src/import_queue.rs b/substrate/client/consensus/common/src/import_queue.rs index 2e8455990a32..079bb1cd0c0c 100644 --- a/substrate/client/consensus/common/src/import_queue.rs +++ b/substrate/client/consensus/common/src/import_queue.rs @@ -294,7 +294,9 @@ where Block: BlockT, BI: BlockImport, { - match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? { + match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None) + .await? + { SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status), SingleBlockVerificationOutcome::Verified(import_parameters) => import_single_block_metered(import_handle, import_parameters, None).await, @@ -363,6 +365,7 @@ pub(crate) async fn verify_single_block_metered( block_origin: BlockOrigin, block: IncomingBlock, verifier: &dyn Verifier, + allow_missing_parent: bool, metrics: Option<&Metrics>, ) -> Result, BlockImportError> where @@ -401,7 +404,7 @@ where parent_hash, allow_missing_state: block.allow_missing_state, import_existing: block.import_existing, - allow_missing_parent: block.state.is_some(), + allow_missing_parent: allow_missing_parent || block.state.is_some(), }) .await, )? { diff --git a/substrate/client/consensus/common/src/import_queue/basic_queue.rs b/substrate/client/consensus/common/src/import_queue/basic_queue.rs index 87ab26db6c91..a60ce7bc6bfc 100644 --- a/substrate/client/consensus/common/src/import_queue/basic_queue.rs +++ b/substrate/client/consensus/common/src/import_queue/basic_queue.rs @@ -17,6 +17,7 @@ // along with this program. If not, see . use futures::{ prelude::*, + stream::FuturesOrdered, task::{Context, Poll}, }; use log::{debug, trace}; @@ -27,7 +28,14 @@ use sp_runtime::{ traits::{Block as BlockT, Header as HeaderT, NumberFor}, Justification, Justifications, }; -use std::pin::Pin; +use std::{ + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, +}; +use tokio::{runtime::Handle, task}; use crate::{ import_queue::{ @@ -223,11 +231,12 @@ mod worker_messages { /// Returns when `block_import` ended. async fn block_import_process( mut block_import: SharedBlockImport, - mut verifier: impl Verifier, + verifier: impl Verifier + 'static, mut result_sender: BufferedLinkSender, mut block_import_receiver: TracingUnboundedReceiver>, metrics: Option, ) { + let verifier: Arc> = Arc::new(verifier); loop { let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await { @@ -241,9 +250,18 @@ async fn block_import_process( }, }; - let res = - import_many_blocks(&mut block_import, origin, blocks, &mut verifier, metrics.clone()) - .await; + let res = if verifier.supports_stateless_verification() { + import_many_blocks_with_stateless_verification( + &mut block_import, + origin, + blocks, + &verifier, + metrics.clone(), + ) + .await + } else { + import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await + }; result_sender.blocks_processed(res.imported, res.block_count, res.results); } @@ -385,11 +403,14 @@ struct ImportManyBlocksResult { /// /// This will yield after each imported block once, to ensure that other futures can /// be called as well. +/// +/// For verifiers that support stateless verification use +/// [`import_many_blocks_with_stateless_verification`] for better performance. async fn import_many_blocks>( import_handle: &mut SharedBlockImport, blocks_origin: BlockOrigin, blocks: Vec>, - verifier: &mut V, + verifier: &V, metrics: Option, ) -> ImportManyBlocksResult { let count = blocks.len(); @@ -431,6 +452,7 @@ async fn import_many_blocks>( blocks_origin, block, verifier, + false, metrics.as_ref(), ); match verification_fut.await { @@ -466,6 +488,114 @@ async fn import_many_blocks>( } } +/// The same as [`import_many_blocks()`]`, but for verifiers that support stateless verification of +/// blocks (use [`Verifier::supports_stateless_verification()`]). +async fn import_many_blocks_with_stateless_verification( + import_handle: &mut SharedBlockImport, + blocks_origin: BlockOrigin, + blocks: Vec>, + verifier: &Arc>, + metrics: Option, +) -> ImportManyBlocksResult { + let count = blocks.len(); + + let blocks_range = match ( + blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), + blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), + ) { + (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last), + (Some(first), Some(_)) => format!(" ({})", first), + _ => Default::default(), + }; + + trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range); + + let has_error = Arc::new(AtomicBool::new(false)); + + // Blocks in the response/drain should be in ascending order. + let mut verified_blocks = blocks + .into_iter() + .enumerate() + .map(|(index, block)| { + let import_handle = import_handle.clone(); + let verifier = Arc::clone(verifier); + let metrics = metrics.clone(); + let has_error = Arc::clone(&has_error); + + async move { + let block_number = block.header.as_ref().map(|h| *h.number()); + let block_hash = block.hash; + + let result = if has_error.load(Ordering::Acquire) { + Err(BlockImportError::Cancelled) + } else { + task::spawn_blocking(move || { + Handle::current().block_on(verify_single_block_metered( + &import_handle, + blocks_origin, + block, + &verifier, + // Check parent for the first block, but skip for others since blocks + // are verified concurrently before being imported. + index != 0, + metrics.as_ref(), + )) + }) + .await + .unwrap_or_else(|error| { + Err(BlockImportError::Other(sp_consensus::Error::Other( + format!("Failed to join on block verification: {error}").into(), + ))) + }) + }; + + (block_number, block_hash, result) + } + }) + .collect::>(); + + let mut imported = 0; + let mut results = vec![]; + + while let Some((block_number, block_hash, verification_result)) = verified_blocks.next().await { + let import_result = if has_error.load(Ordering::Acquire) { + Err(BlockImportError::Cancelled) + } else { + // The actual import. + match verification_result { + Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status), + Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => + import_single_block_metered(import_handle, import_parameters, metrics.as_ref()) + .await, + Err(e) => Err(e), + } + }; + + if let Some(metrics) = metrics.as_ref() { + metrics.report_import::(&import_result); + } + + if import_result.is_ok() { + trace!( + target: LOG_TARGET, + "Block imported successfully {:?} ({})", + block_number, + block_hash, + ); + imported += 1; + } else { + has_error.store(true, Ordering::Release); + } + + results.push((import_result, block_hash)); + + Yield::new().await + } + + // No block left to import, success! + ImportManyBlocksResult { block_count: count, imported, results } +} + /// A future that will always `yield` on the first call of `poll` but schedules the /// current task for re-execution. ///