Skip to content

Commit

Permalink
Support for concurrent/parallel verification using stateless verifiers
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Sep 16, 2023
1 parent c0ec791 commit b4e8e31
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions substrate/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ sp-consensus = { path = "../../../primitives/consensus/common" }
sp-core = { path = "../../../primitives/core" }
sp-runtime = { path = "../../../primitives/runtime" }
sp-state-machine = { path = "../../../primitives/state-machine" }
tokio = "1.32.0"

[dev-dependencies]
sp-test-primitives = { path = "../../../primitives/test-primitives" }
7 changes: 5 additions & 2 deletions substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ where
Block: BlockT,
BI: BlockImport<Block, Error = ConsensusError>,
{
match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
match verify_single_block_metered(import_handle, block_origin, block, verifier, false, None)
.await?
{
SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
SingleBlockVerificationOutcome::Verified(import_parameters) =>
import_single_block_metered(import_handle, import_parameters, None).await,
Expand Down Expand Up @@ -363,6 +365,7 @@ pub(crate) async fn verify_single_block_metered<Block, BI>(
block_origin: BlockOrigin,
block: IncomingBlock<Block>,
verifier: &dyn Verifier<Block>,
allow_missing_parent: bool,
metrics: Option<&Metrics>,
) -> Result<SingleBlockVerificationOutcome<Block>, BlockImportError>
where
Expand Down Expand Up @@ -401,7 +404,7 @@ where
parent_hash,
allow_missing_state: block.allow_missing_state,
import_existing: block.import_existing,
allow_missing_parent: block.state.is_some(),
allow_missing_parent: allow_missing_parent || block.state.is_some(),
})
.await,
)? {
Expand Down
142 changes: 136 additions & 6 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>.
use futures::{
prelude::*,
stream::FuturesOrdered,
task::{Context, Poll},
};
use log::{debug, trace};
Expand All @@ -27,7 +28,14 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
Justification, Justifications,
};
use std::pin::Pin;
use std::{
pin::Pin,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::{runtime::Handle, task};

use crate::{
import_queue::{
Expand Down Expand Up @@ -223,11 +231,12 @@ mod worker_messages {
/// Returns when `block_import` ended.
async fn block_import_process<B: BlockT>(
mut block_import: SharedBlockImport<B>,
mut verifier: impl Verifier<B>,
verifier: impl Verifier<B> + 'static,
mut result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
) {
let verifier: Arc<dyn Verifier<B>> = Arc::new(verifier);
loop {
let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
{
Expand All @@ -241,9 +250,18 @@ async fn block_import_process<B: BlockT>(
},
};

let res =
import_many_blocks(&mut block_import, origin, blocks, &mut verifier, metrics.clone())
.await;
let res = if verifier.supports_stateless_verification() {
import_many_blocks_with_stateless_verification(
&mut block_import,
origin,
blocks,
&verifier,
metrics.clone(),
)
.await
} else {
import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await
};

result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
Expand Down Expand Up @@ -385,11 +403,14 @@ struct ImportManyBlocksResult<B: BlockT> {
///
/// This will yield after each imported block once, to ensure that other futures can
/// be called as well.
///
/// For verifiers that support stateless verification use
/// [`import_many_blocks_with_stateless_verification`] for better performance.
async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
import_handle: &mut SharedBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &mut V,
verifier: &V,
metrics: Option<Metrics>,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();
Expand Down Expand Up @@ -431,6 +452,7 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
blocks_origin,
block,
verifier,
false,
metrics.as_ref(),
);
match verification_fut.await {
Expand Down Expand Up @@ -466,6 +488,114 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
}
}

/// The same as [`import_many_blocks()`]`, but for verifiers that support stateless verification of
/// blocks (use [`Verifier::supports_stateless_verification()`]).
async fn import_many_blocks_with_stateless_verification<B: BlockT>(
import_handle: &mut SharedBlockImport<B>,
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &Arc<dyn Verifier<B>>,
metrics: Option<Metrics>,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();

let blocks_range = match (
blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
) {
(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
(Some(first), Some(_)) => format!(" ({})", first),
_ => Default::default(),
};

trace!(target: LOG_TARGET, "Starting import of {} blocks {}", count, blocks_range);

let has_error = Arc::new(AtomicBool::new(false));

// Blocks in the response/drain should be in ascending order.
let mut verified_blocks = blocks
.into_iter()
.enumerate()
.map(|(index, block)| {
let import_handle = import_handle.clone();
let verifier = Arc::clone(verifier);
let metrics = metrics.clone();
let has_error = Arc::clone(&has_error);

async move {
let block_number = block.header.as_ref().map(|h| *h.number());
let block_hash = block.hash;

let result = if has_error.load(Ordering::Acquire) {
Err(BlockImportError::Cancelled)
} else {
task::spawn_blocking(move || {
Handle::current().block_on(verify_single_block_metered(
&import_handle,
blocks_origin,
block,
&verifier,
// Check parent for the first block, but skip for others since blocks
// are verified concurrently before being imported.
index != 0,
metrics.as_ref(),
))
})
.await
.unwrap_or_else(|error| {
Err(BlockImportError::Other(sp_consensus::Error::Other(
format!("Failed to join on block verification: {error}").into(),
)))
})
};

(block_number, block_hash, result)
}
})
.collect::<FuturesOrdered<_>>();

let mut imported = 0;
let mut results = vec![];

while let Some((block_number, block_hash, verification_result)) = verified_blocks.next().await {
let import_result = if has_error.load(Ordering::Acquire) {
Err(BlockImportError::Cancelled)
} else {
// The actual import.
match verification_result {
Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) =>
import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
.await,
Err(e) => Err(e),
}
};

if let Some(metrics) = metrics.as_ref() {
metrics.report_import::<B>(&import_result);
}

if import_result.is_ok() {
trace!(
target: LOG_TARGET,
"Block imported successfully {:?} ({})",
block_number,
block_hash,
);
imported += 1;
} else {
has_error.store(true, Ordering::Release);
}

results.push((import_result, block_hash));

Yield::new().await
}

// No block left to import, success!
ImportManyBlocksResult { block_count: count, imported, results }
}

/// A future that will always `yield` on the first call of `poll` but schedules the
/// current task for re-execution.
///
Expand Down

0 comments on commit b4e8e31

Please sign in to comment.