Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent verification backport #2

Merged
merged 9 commits into from
Sep 27, 2023
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
async-trait = "0.1.73"
codec = { package = "parity-scale-codec", version = "3.0.0", features = [ "derive" ] }
futures = "0.3.28"
parking_lot = "0.12.1"
tracing = "0.1.37"
schnellru = "0.2.1"

Expand Down
13 changes: 7 additions & 6 deletions cumulus/client/consensus/aura/src/equivocation_import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
/// should be thrown out and which ones should be kept.
use codec::Codec;
use cumulus_client_consensus_common::ParachainBlockImportMarker;
use parking_lot::Mutex;
use schnellru::{ByLength, LruMap};

use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams, ForkChoiceStrategy,
BlockImport, BlockImportParams, ForkChoiceStrategy, SharedBlockImport,
};
use sc_consensus_aura::standalone as aura_internal;
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
Expand Down Expand Up @@ -71,7 +72,7 @@ struct Verifier<P, Client, Block, CIDP> {
client: Arc<Client>,
create_inherent_data_providers: CIDP,
slot_duration: SlotDuration,
defender: NaiveEquivocationDefender,
defender: Mutex<NaiveEquivocationDefender>,
telemetry: Option<TelemetryHandle>,
_phantom: std::marker::PhantomData<fn() -> (Block, P)>,
}
Expand All @@ -89,7 +90,7 @@ where
CIDP: CreateInherentDataProviders<Block, ()>,
{
async fn verify(
&mut self,
&self,
mut block_params: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
// Skip checks that include execution, if being told so, or when importing only state.
Expand Down Expand Up @@ -132,7 +133,7 @@ where
block_params.post_hash = Some(post_hash);

// Check for and reject egregious amounts of equivocations.
if self.defender.insert_and_check(slot) {
if self.defender.lock().insert_and_check(slot) {
return Err(format!(
"Rejecting block {:?} due to excessive equivocations at slot",
post_hash,
Expand Down Expand Up @@ -239,11 +240,11 @@ where
let verifier = Verifier::<P, _, _, _> {
client,
create_inherent_data_providers,
defender: NaiveEquivocationDefender::default(),
defender: Mutex::new(NaiveEquivocationDefender::default()),
slot_duration,
telemetry,
_phantom: std::marker::PhantomData,
};

BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry)
BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry)
}
5 changes: 3 additions & 2 deletions cumulus/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sp_runtime::traits::Block as BlockT;
use sc_consensus::{
block_import::{BlockImport, BlockImportParams},
import_queue::{BasicQueue, Verifier},
SharedBlockImport,
};

use crate::ParachainBlockImportMarker;
Expand All @@ -50,7 +51,7 @@ pub struct VerifyNothing;
#[async_trait::async_trait]
impl<Block: BlockT> Verifier<Block> for VerifyNothing {
async fn verify(
&mut self,
&self,
params: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
Ok(params)
Expand All @@ -72,5 +73,5 @@ where
+ Sync
+ 'static,
{
BasicQueue::new(VerifyNothing, Box::new(block_import), None, spawner, registry)
BasicQueue::new(VerifyNothing, SharedBlockImport::new(block_import), None, spawner, registry)
}
4 changes: 2 additions & 2 deletions cumulus/client/consensus/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ impl<Block: BlockT, I: Clone, BE> Clone for ParachainBlockImport<Block, I, BE> {
impl<Block, BI, BE> BlockImport<Block> for ParachainBlockImport<Block, BI, BE>
where
Block: BlockT,
BI: BlockImport<Block> + Send,
BI: BlockImport<Block> + Send + Sync,
BE: Backend<Block>,
{
type Error = BI::Error;

async fn check_block(
&mut self,
&self,
block: sc_consensus::BlockCheckParams<Block>,
) -> Result<sc_consensus::ImportResult, Self::Error> {
self.inner.check_block(block).await
Expand Down
6 changes: 3 additions & 3 deletions cumulus/client/consensus/relay-chain/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use cumulus_client_consensus_common::ParachainBlockImportMarker;

use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImport, BlockImportParams,
BlockImport, BlockImportParams, SharedBlockImport,
};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
Expand Down Expand Up @@ -52,7 +52,7 @@ where
CIDP: CreateInherentDataProviders<Block, ()>,
{
async fn verify(
&mut self,
&self,
mut block_params: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
// Skip checks that include execution, if being told so, or when importing only state.
Expand Down Expand Up @@ -125,5 +125,5 @@ where
{
let verifier = Verifier::new(client, create_inherent_data_providers);

Ok(BasicQueue::new(verifier, Box::new(block_import), None, spawner, registry))
Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, spawner, registry))
}
2 changes: 1 addition & 1 deletion cumulus/client/pov-recovery/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ where

/// Import the given `block`.
///
/// This will also recursivley drain `waiting_for_parent` and import them as well.
/// This will also recursively drain `waiting_for_parent` and import them as well.
async fn import_block(&mut self, block: Block) {
let mut blocks = VecDeque::new();

Expand Down
12 changes: 6 additions & 6 deletions cumulus/polkadot-parachain/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier;
use futures::lock::Mutex;
use sc_consensus::{
import_queue::{BasicQueue, Verifier as VerifierT},
BlockImportParams, ImportQueue,
BlockImportParams, ImportQueue, SharedBlockImport,
};
use sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY};
use sc_network::{config::FullNetworkConfiguration, NetworkBlock};
Expand Down Expand Up @@ -1022,7 +1022,7 @@ where

struct Verifier<Client, AuraId> {
client: Arc<Client>,
aura_verifier: BuildOnAccess<Box<dyn VerifierT<Block>>>,
aura_verifier: Mutex<BuildOnAccess<Box<dyn VerifierT<Block>>>>,
relay_chain_verifier: Box<dyn VerifierT<Block>>,
_phantom: PhantomData<AuraId>,
}
Expand All @@ -1035,7 +1035,7 @@ where
AuraId: Send + Sync + Codec,
{
async fn verify(
&mut self,
&self,
block_import: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
if self
Expand All @@ -1044,7 +1044,7 @@ where
.has_api::<dyn AuraApi<Block, AuraId>>(*block_import.header.parent_hash())
.unwrap_or(false)
{
self.aura_verifier.get_mut().verify(block_import).await
self.aura_verifier.lock().await.get_mut().verify(block_import).await
} else {
self.relay_chain_verifier.verify(block_import).await
}
Expand Down Expand Up @@ -1104,14 +1104,14 @@ where
let verifier = Verifier {
client,
relay_chain_verifier,
aura_verifier: BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier))),
aura_verifier: Mutex::new(BuildOnAccess::Uninitialized(Some(Box::new(aura_verifier)))),
_phantom: PhantomData,
};

let registry = config.prometheus_registry();
let spawner = task_manager.spawn_essential_handle();

Ok(BasicQueue::new(verifier, Box::new(block_import), None, &spawner, registry))
Ok(BasicQueue::new(verifier, SharedBlockImport::new(block_import), None, &spawner, registry))
}

/// Start an aura powered parachain node. Asset Hub and Collectives use this.
Expand Down
11 changes: 9 additions & 2 deletions substrate/client/consensus/aura/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use sc_client_api::{backend::AuxStore, BlockOf, UsageProvider};
use sc_consensus::{
block_import::{BlockImport, BlockImportParams, ForkChoiceStrategy},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
SharedBlockImport,
};
use sc_consensus_slots::{check_equivocation, CheckedHeader, InherentDataProviderExt};
use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
Expand Down Expand Up @@ -174,7 +175,7 @@ where
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
{
async fn verify(
&mut self,
&self,
mut block: BlockImportParams<B>,
) -> Result<BlockImportParams<B>, String> {
// Skip checks that include execution, if being told so or when importing only state.
Expand Down Expand Up @@ -376,7 +377,13 @@ where
compatibility_mode,
});

Ok(BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry))
Ok(BasicQueue::new(
verifier,
SharedBlockImport::new(block_import),
justification_import,
spawner,
registry,
))
}

/// Parameters of [`build_verifier`].
Expand Down
13 changes: 10 additions & 3 deletions substrate/client/consensus/babe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ use sc_consensus::{
StateAction,
},
import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
SharedBlockImport,
};
use sc_consensus_epochs::{
descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpochDescriptor,
Expand Down Expand Up @@ -1130,7 +1131,7 @@ where
CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
{
async fn verify(
&mut self,
&self,
mut block: BlockImportParams<Block>,
) -> Result<BlockImportParams<Block>, String> {
trace!(
Expand Down Expand Up @@ -1683,7 +1684,7 @@ where
}

async fn check_block(
&mut self,
&self,
block: BlockCheckParams<Block>,
) -> Result<ImportResult, Self::Error> {
self.inner.check_block(block).await.map_err(Into::into)
Expand Down Expand Up @@ -1856,7 +1857,13 @@ where
spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());

Ok((
BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
BasicQueue::new(
verifier,
SharedBlockImport::new(block_import),
justification_import,
spawner,
registry,
),
BabeWorkerHandle(worker_tx),
))
}
Expand Down
Loading
Loading