From a34cc8dff09dc3840a304befc2415343244b5fd0 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Thu, 12 Sep 2024 12:37:07 +0300 Subject: [PATCH] [4 / 5] Make approval-voting runnable on a worker thread (#4846) This is part of the work to further optimize the approval subsystems, if you want to understand the full context start with reading https://github.com/paritytech/polkadot-sdk/pull/4849#issue-2364261568, # Description This PR contain changes to make possible the run of single approval-voting instance on a worker thread, so that it can be instantiated by the approval-voting-parallel subsystem. This does not contain any functional changes it just decouples the subsystem from the subsystem Context and introduces more specific trait dependencies for each function instead of all of them requiring a context. This change can be merged independent of the followup PRs. --------- Signed-off-by: Alexandru Gheorghe Co-authored-by: Andrei Sandu <54316454+sandreim@users.noreply.github.com> --- polkadot/cli/src/command.rs | 8 +- polkadot/node/core/approval-voting/Cargo.toml | 1 + .../node/core/approval-voting/src/import.rs | 114 ++--- polkadot/node/core/approval-voting/src/lib.rs | 392 ++++++++++++------ .../node/core/approval-voting/src/tests.rs | 25 +- polkadot/node/service/src/lib.rs | 13 +- polkadot/node/service/src/overseer.rs | 1 + .../subsystem-bench/src/lib/approval/mod.rs | 3 +- prdoc/pr_4846.prdoc | 13 + 9 files changed, 382 insertions(+), 188 deletions(-) create mode 100644 prdoc/pr_4846.prdoc diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index 168a645430ed..2947867c516e 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -373,16 +373,16 @@ pub fn run() -> Result<()> { Ok(runner.async_run(|mut config| { let (client, backend, _, task_manager) = polkadot_service::new_chain_ops(&mut config, None)?; + let task_handle = task_manager.spawn_handle(); let aux_revert = Box::new(|client, backend, blocks| { - polkadot_service::revert_backend(client, backend, blocks, config).map_err( - |err| { + polkadot_service::revert_backend(client, backend, blocks, config, task_handle) + .map_err(|err| { match err { polkadot_service::Error::Blockchain(err) => err.into(), // Generic application-specific error. err => sc_cli::Error::Application(err.into()), } - }, - ) + }) }); Ok(( cmd.run(client, backend, Some(aux_revert)).map_err(Error::SubstrateCli), diff --git a/polkadot/node/core/approval-voting/Cargo.toml b/polkadot/node/core/approval-voting/Cargo.toml index bc0187bf4922..e678118440f5 100644 --- a/polkadot/node/core/approval-voting/Cargo.toml +++ b/polkadot/node/core/approval-voting/Cargo.toml @@ -22,6 +22,7 @@ kvdb = { workspace = true } derive_more = { workspace = true, default-features = true } thiserror = { workspace = true } itertools = { workspace = true } +async-trait = { workspace = true } polkadot-node-subsystem = { workspace = true, default-features = true } polkadot-node-subsystem-util = { workspace = true, default-features = true } diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index b163d718eb25..bf6ea0c98149 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -44,6 +44,7 @@ use polkadot_node_subsystem::{ overseer, RuntimeApiError, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo}; +use polkadot_overseer::SubsystemSender; use polkadot_primitives::{ node_features, BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog, CoreIndex, GroupIndex, Hash, Header, SessionIndex, @@ -111,8 +112,8 @@ enum ImportedBlockInfoError { /// Computes information about the imported block. Returns an error if the info couldn't be /// extracted. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn imported_block_info( - ctx: &mut Context, +async fn imported_block_info>( + sender: &mut Sender, env: ImportedBlockInfoEnv<'_>, block_hash: Hash, block_header: &Header, @@ -124,11 +125,12 @@ async fn imported_block_info( // fetch candidates let included_candidates: Vec<_> = { let (c_tx, c_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::CandidateEvents(c_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::CandidateEvents(c_tx), + )) + .await; let events: Vec = match c_rx.await { Ok(Ok(events)) => events, @@ -151,11 +153,12 @@ async fn imported_block_info( // short, that shouldn't happen. let session_index = { let (s_tx, s_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_header.parent_hash, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_header.parent_hash, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) + .await; let session_index = match s_rx.await { Ok(Ok(s)) => s, @@ -201,11 +204,12 @@ async fn imported_block_info( // by one block. This gives us the opposite invariant for sessions - the parent block's // post-state gives us the canonical information about the session index for any of its // children, regardless of which slot number they might be produced at. - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::CurrentBabeEpoch(s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::CurrentBabeEpoch(s_tx), + )) + .await; match s_rx.await { Ok(Ok(s)) => s, @@ -216,7 +220,7 @@ async fn imported_block_info( }; let extended_session_info = - get_extended_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await; + get_extended_session_info(env.runtime_info, sender, block_hash, session_index).await; let enable_v2_assignments = extended_session_info.map_or(false, |extended_session_info| { *extended_session_info .node_features @@ -225,7 +229,7 @@ async fn imported_block_info( .unwrap_or(&false) }); - let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index) + let session_info = get_session_info(env.runtime_info, sender, block_hash, session_index) .await .ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?; @@ -329,9 +333,15 @@ pub struct BlockImportedCandidates { /// * and return information about all candidates imported under each block. /// /// It is the responsibility of the caller to schedule wakeups for each block. -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -pub(crate) async fn handle_new_head( - ctx: &mut Context, +pub(crate) async fn handle_new_head< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender, + AVSender: SubsystemSender, + B: Backend, +>( + sender: &mut Sender, + approval_voting_sender: &mut AVSender, state: &State, db: &mut OverlayedBackend<'_, B>, session_info_provider: &mut RuntimeInfo, @@ -349,7 +359,7 @@ pub(crate) async fn handle_new_head( let header = { let (h_tx, h_rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; + sender.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; match h_rx.await? { Err(e) => { gum::debug!( @@ -375,7 +385,7 @@ pub(crate) async fn handle_new_head( let lower_bound_number = finalized_number.unwrap_or(lower_bound_number).max(lower_bound_number); let new_blocks = determine_new_blocks( - ctx.sender(), + sender, |h| db.load_block_entry(h).map(|e| e.is_some()), head, &header, @@ -401,12 +411,15 @@ pub(crate) async fn handle_new_head( keystore: &state.keystore, }; - match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await { + match imported_block_info(sender, env, block_hash, &block_header, finalized_number) + .await + { Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)), Err(error) => { // It's possible that we've lost a race with finality. let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx)) + sender + .send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx)) .await; let lost_to_finality = match rx.await { @@ -450,17 +463,11 @@ pub(crate) async fn handle_new_head( force_approve, } = imported_block_info; - let session_info = match get_session_info( - session_info_provider, - ctx.sender(), - head, - session_index, - ) - .await - { - Some(session_info) => session_info, - None => return Ok(Vec::new()), - }; + let session_info = + match get_session_info(session_info_provider, sender, head, session_index).await { + Some(session_info) => session_info, + None => return Ok(Vec::new()), + }; let (block_tick, no_show_duration) = { let block_tick = slot_number_to_tick(state.slot_duration_millis, slot); @@ -510,7 +517,7 @@ pub(crate) async fn handle_new_head( }; // If all bits are already set, then send an approve message. if approved_bitfield.count_ones() == approved_bitfield.len() { - ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; + sender.send_message(ChainSelectionMessage::Approved(block_hash)).await; } let block_entry = v3::BlockEntry { @@ -567,7 +574,7 @@ pub(crate) async fn handle_new_head( // Notify chain-selection of all approved hashes. for hash in approved_hashes { - ctx.send_message(ChainSelectionMessage::Approved(hash)).await; + sender.send_message(ChainSelectionMessage::Approved(hash)).await; } } @@ -603,7 +610,8 @@ pub(crate) async fn handle_new_head( "Informing distribution of newly imported chain", ); - ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); + approval_voting_sender + .send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); Ok(imported_candidates) } @@ -620,7 +628,10 @@ pub(crate) mod tests { approval::v1::{VrfSignature, VrfTranscript}, DISPUTE_WINDOW, }; - use polkadot_node_subsystem::messages::{AllMessages, ApprovalVotingMessage}; + use polkadot_node_subsystem::{ + messages::{AllMessages, ApprovalVotingMessage}, + SubsystemContext, + }; use polkadot_node_subsystem_test_helpers::make_subsystem_context; use polkadot_node_subsystem_util::database::Database; use polkadot_primitives::{ @@ -662,7 +673,7 @@ pub(crate) mod tests { State { keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, - clock: Box::new(MockClock::default()), + clock: Arc::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::default()), spans: HashMap::new(), per_block_assignments_gathering_times: LruMap::new(ByLength::new( @@ -806,8 +817,9 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = - imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap(); + let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4)) + .await + .unwrap(); assert_eq!(info.included_candidates, included_candidates); assert_eq!(info.session_index, session); @@ -953,7 +965,7 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await; + let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await; assert_matches!(info, Err(ImportedBlockInfoError::VrfInfoUnavailable)); }) @@ -1092,7 +1104,7 @@ pub(crate) mod tests { keystore: &LocalKeystore::in_memory(), }; - let info = imported_block_info(&mut ctx, env, hash, &header, &Some(6)).await; + let info = imported_block_info(ctx.sender(), env, hash, &header, &Some(6)).await; assert_matches!(info, Err(ImportedBlockInfoError::BlockAlreadyFinalized)); }) @@ -1128,7 +1140,8 @@ pub(crate) mod tests { #[test] fn imported_block_info_extracts_force_approve() { let pool = TaskExecutor::new(); - let (mut ctx, mut handle) = make_subsystem_context(pool.clone()); + let (mut ctx, mut handle) = + make_subsystem_context::(pool.clone()); let session = 5; let session_info = dummy_session_info(session); @@ -1191,7 +1204,7 @@ pub(crate) mod tests { }; let info = - imported_block_info(&mut ctx, env, hash, &header, &Some(4)).await.unwrap(); + imported_block_info(ctx.sender(), env, hash, &header, &Some(4)).await.unwrap(); assert_eq!(info.included_candidates, included_candidates); assert_eq!(info.session_index, session); @@ -1384,8 +1397,11 @@ pub(crate) mod tests { let test_fut = { Box::pin(async move { let mut overlay_db = OverlayedBackend::new(&db); + + let mut approval_voting_sender = ctx.sender().clone(); let result = handle_new_head( - &mut ctx, + ctx.sender(), + &mut approval_voting_sender, &state, &mut overlay_db, &mut session_info_provider, diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 942922cba6df..2149ce81fa80 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -165,7 +165,8 @@ pub struct ApprovalVotingSubsystem { db: Arc, mode: Mode, metrics: Metrics, - clock: Box, + clock: Arc, + spawner: Arc, } #[derive(Clone)] @@ -484,6 +485,7 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, + spawner: Arc, ) -> Self { ApprovalVotingSubsystem::with_config_and_clock( config, @@ -491,7 +493,8 @@ impl ApprovalVotingSubsystem { keystore, sync_oracle, metrics, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), + spawner, ) } @@ -502,7 +505,8 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, - clock: Box, + clock: Arc, + spawner: Arc, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -512,6 +516,7 @@ impl ApprovalVotingSubsystem { mode: Mode::Syncing(sync_oracle), metrics, clock, + spawner, } } @@ -551,12 +556,21 @@ fn db_sanity_check(db: Arc, config: DatabaseConfig) -> SubsystemRe #[overseer::subsystem(ApprovalVoting, error = SubsystemError, prefix = self::overseer)] impl ApprovalVotingSubsystem { - fn start(self, ctx: Context) -> SpawnedSubsystem { + fn start(self, mut ctx: Context) -> SpawnedSubsystem { let backend = DbBackend::new(self.db.clone(), self.db_config); - let future = - run::(ctx, self, Box::new(RealAssignmentCriteria), backend) - .map_err(|e| SubsystemError::with_origin("approval-voting", e)) - .boxed(); + let to_other_subsystems = ctx.sender().clone(); + let to_approval_distr = ctx.sender().clone(); + + let future = run::( + ctx, + to_other_subsystems, + to_approval_distr, + self, + Box::new(RealAssignmentCriteria), + backend, + ) + .map_err(|e| SubsystemError::with_origin("approval-voting", e)) + .boxed(); SpawnedSubsystem { name: "approval-voting-subsystem", future } } @@ -825,7 +839,7 @@ where struct State { keystore: Arc, slot_duration_millis: u64, - clock: Box, + clock: Arc, assignment_criteria: Box, spans: HashMap, // Per block, candidate records about how long we take until we gather enough @@ -961,20 +975,20 @@ impl State { } // Returns the approval voting params from the RuntimeApi. - #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] - async fn get_approval_voting_params_or_default( + async fn get_approval_voting_params_or_default>( &self, - ctx: &mut Context, + sender: &mut Sender, session_index: SessionIndex, block_hash: Hash, ) -> Option { let (s_tx, s_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx), + )) + .await; match s_rx.await { Ok(Ok(params)) => { @@ -1143,9 +1157,36 @@ enum Action { Conclude, } +/// Trait for providing approval voting subsystem with work. +#[async_trait::async_trait] +pub trait ApprovalVotingWorkProvider { + async fn recv(&mut self) -> SubsystemResult>; +} + +#[async_trait::async_trait] +#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] +impl ApprovalVotingWorkProvider for Context { + async fn recv(&mut self) -> SubsystemResult> { + self.recv().await + } +} + #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn run( - mut ctx: Context, +async fn run< + B, + WorkProvider: ApprovalVotingWorkProvider, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + mut work_provider: WorkProvider, + mut to_other_subsystems: Sender, + mut to_approval_distr: ADSender, mut subsystem: ApprovalVotingSubsystem, assignment_criteria: Box, mut backend: B, @@ -1169,19 +1210,11 @@ where no_show_stats: NoShowStats::default(), }; - // `None` on start-up. Gets initialized/updated on leaf update - let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { - keystore: None, - session_cache_lru_size: DISPUTE_WINDOW.get(), - }); - let mut wakeups = Wakeups::default(); - let mut currently_checking_set = CurrentlyCheckingSet::default(); - let mut delayed_approvals_timers = DelayedApprovalTimer::default(); - let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE)); - let mut last_finalized_height: Option = { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; + to_other_subsystems + .send_message(ChainApiMessage::FinalizedBlockNumber(tx)) + .await; match rx.await? { Ok(number) => Some(number), Err(err) => { @@ -1191,13 +1224,24 @@ where } }; + // `None` on start-up. Gets initialized/updated on leaf update + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: DISPUTE_WINDOW.get(), + }); + + let mut wakeups = Wakeups::default(); + let mut currently_checking_set = CurrentlyCheckingSet::default(); + let mut delayed_approvals_timers = DelayedApprovalTimer::default(); + let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE)); + loop { let mut overlayed_db = OverlayedBackend::new(&backend); let actions = futures::select! { (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { subsystem.metrics.on_wakeup(); process_wakeup( - &mut ctx, + &mut to_other_subsystems, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1207,9 +1251,11 @@ where &wakeups, ).await? } - next_msg = ctx.recv().fuse() => { + next_msg = work_provider.recv().fuse() => { let mut actions = handle_from_overseer( - &mut ctx, + &mut to_other_subsystems, + &mut to_approval_distr, + &subsystem.spawner, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1269,7 +1315,8 @@ where &mut overlayed_db, &mut session_info_provider, &state, - &mut ctx, + &mut to_other_subsystems, + &mut to_approval_distr, block_hash, validator_index, &subsystem.metrics, @@ -1291,7 +1338,9 @@ where }; if handle_actions( - &mut ctx, + &mut to_other_subsystems, + &mut to_approval_distr, + &subsystem.spawner, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1318,6 +1367,63 @@ where Ok(()) } +// Starts a worker thread that runs the approval voting subsystem. +pub async fn start_approval_worker< + WorkProvider: ApprovalVotingWorkProvider + Send + 'static, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + work_provider: WorkProvider, + to_other_subsystems: Sender, + to_approval_distr: ADSender, + config: Config, + db: Arc, + keystore: Arc, + sync_oracle: Box, + metrics: Metrics, + spawner: Arc, + task_name: &'static str, + group_name: &'static str, + clock: Arc, +) -> SubsystemResult<()> { + let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( + config, + db.clone(), + keystore, + sync_oracle, + metrics, + clock, + spawner, + ); + let backend = DbBackend::new(db.clone(), approval_voting.db_config); + let spawner = approval_voting.spawner.clone(); + spawner.spawn_blocking( + task_name, + Some(group_name), + Box::pin(async move { + if let Err(err) = run( + work_provider, + to_other_subsystems, + to_approval_distr, + approval_voting, + Box::new(RealAssignmentCriteria), + backend, + ) + .await + { + gum::error!(target: LOG_TARGET, ?err, "Approval voting worker stopped processing messages"); + }; + }), + ); + Ok(()) +} + // Handle actions is a function that accepts a set of instructions // and subsequently updates the underlying approvals_db in accordance // with the linear set of instructions passed in. Therefore, actions @@ -1338,8 +1444,19 @@ where // // returns `true` if any of the actions was a `Conclude` command. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_actions( - ctx: &mut Context, +async fn handle_actions< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut ADSender, + spawn_handle: &Arc, state: &mut State, overlayed_db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -1371,7 +1488,8 @@ async fn handle_actions( // Note that chaining these iterators is O(n) as we must consume // the prior iterator. let next_actions: Vec = issue_approval( - ctx, + sender, + approval_voting_sender, state, overlayed_db, session_info_provider, @@ -1422,10 +1540,12 @@ async fn handle_actions( let validator_index = indirect_cert.validator; if distribute_assignment { - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( - indirect_cert, - claimed_candidate_indices, - )); + approval_voting_sender.send_unbounded_message( + ApprovalDistributionMessage::DistributeAssignment( + indirect_cert, + claimed_candidate_indices, + ), + ); } match approvals_cache.get(&candidate_hash) { @@ -1440,7 +1560,8 @@ async fn handle_actions( actions_iter = new_actions.into_iter(); }, None => { - let ctx = &mut *ctx; + let sender = sender.clone(); + let spawn_handle = spawn_handle.clone(); currently_checking_set .insert_relay_block_hash( @@ -1449,7 +1570,8 @@ async fn handle_actions( relay_block_hash, async move { launch_approval( - ctx, + sender, + spawn_handle, metrics.clone(), session, candidate, @@ -1478,13 +1600,13 @@ async fn handle_actions( }) .with_string_tag("block-hash", format!("{:?}", block_hash)) .with_stage(jaeger::Stage::ApprovalChecking); - ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; + sender.send_message(ChainSelectionMessage::Approved(block_hash)).await; }, Action::BecomeActive => { *mode = Mode::Active; let (messages, next_actions) = distribution_messages_for_activation( - ctx, + sender, overlayed_db, state, delayed_approvals_timers, @@ -1492,7 +1614,7 @@ async fn handle_actions( ) .await?; - ctx.send_messages(messages.into_iter()).await; + approval_voting_sender.send_messages(messages.into_iter()).await; let next_actions: Vec = next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); @@ -1566,8 +1688,8 @@ fn get_assignment_core_indices( } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn distribution_messages_for_activation( - ctx: &mut Context, +async fn distribution_messages_for_activation>( + sender: &mut Sender, db: &OverlayedBackend<'_, impl Backend>, state: &State, delayed_approvals_timers: &mut DelayedApprovalTimer, @@ -1693,7 +1815,7 @@ async fn distribution_messages_for_activation( let ExtendedSessionInfo { ref executor_params, .. } = match get_extended_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.block_hash(), block_entry.session(), ) @@ -1791,9 +1913,16 @@ async fn distribution_messages_for_activation( } // Handle an incoming signal from the overseer. Returns true if execution should conclude. -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_from_overseer( - ctx: &mut Context, +async fn handle_from_overseer< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + ADSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut ADSender, + spawn_handle: &Arc, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -1811,7 +1940,8 @@ async fn handle_from_overseer( jaeger::PerLeafSpan::new(activated.span, "approval-voting"); state.spans.insert(head, approval_voting_span); match import::handle_new_head( - ctx, + sender, + approval_voting_sender, state, db, session_info_provider, @@ -1894,14 +2024,9 @@ async fn handle_from_overseer( }, FromOrchestra::Communication { msg } => match msg { ApprovalVotingMessage::ImportAssignment(checked_assignment, tx) => { - let (check_outcome, actions) = import_assignment( - ctx.sender(), - state, - db, - session_info_provider, - checked_assignment, - ) - .await?; + let (check_outcome, actions) = + import_assignment(sender, state, db, session_info_provider, checked_assignment) + .await?; // approval-distribution makes sure this assignment is valid and expected, // so this import should never fail, if it does it might mean one of two things, // there is a bug in the code or the two subsystems got out of sync. @@ -1912,16 +2037,9 @@ async fn handle_from_overseer( actions }, ApprovalVotingMessage::ImportApproval(a, tx) => { - let result = import_approval( - ctx.sender(), - state, - db, - session_info_provider, - metrics, - a, - &wakeups, - ) - .await?; + let result = + import_approval(sender, state, db, session_info_provider, metrics, a, &wakeups) + .await?; // approval-distribution makes sure this vote is valid and expected, // so this import should never fail, if it does it might mean one of two things, // there is a bug in the code or the two subsystems got out of sync. @@ -1941,7 +2059,7 @@ async fn handle_from_overseer( .with_stage(jaeger::Stage::ApprovalChecking) .with_string_tag("leaf", format!("{:?}", target)); match handle_approved_ancestor( - ctx, + sender, db, target, lower_bound, @@ -1964,7 +2082,14 @@ async fn handle_from_overseer( }, ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => { metrics.on_candidate_signatures_request(); - get_approval_signatures_for_candidate(ctx, db, candidate_hash, tx).await?; + get_approval_signatures_for_candidate( + approval_voting_sender.clone(), + spawn_handle, + db, + candidate_hash, + tx, + ) + .await?; Vec::new() }, }, @@ -1978,8 +2103,11 @@ async fn handle_from_overseer( /// This involves an unbounded message send to approval-distribution, the caller has to ensure that /// calls to this function are infrequent and bounded. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn get_approval_signatures_for_candidate( - ctx: &mut Context, +async fn get_approval_signatures_for_candidate< + Sender: SubsystemSender, +>( + mut sender: Sender, + spawn_handle: &Arc, db: &OverlayedBackend<'_, impl Backend>, candidate_hash: CandidateHash, tx: oneshot::Sender, ValidatorSignature)>>, @@ -2038,7 +2166,6 @@ async fn get_approval_signatures_for_candidate( } } - let mut sender = ctx.sender().clone(); let get_approvals = async move { let (tx_distribution, rx_distribution) = oneshot::channel(); sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures( @@ -2118,12 +2245,17 @@ async fn get_approval_signatures_for_candidate( ?candidate_hash, "Spawning task for fetching signatures from approval-distribution" ); - ctx.spawn("get-approval-signatures", Box::pin(get_approvals)) + spawn_handle.spawn( + "get-approval-signatures", + Some("approval-voting-subsystem"), + Box::pin(get_approvals), + ); + Ok(()) } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_approved_ancestor( - ctx: &mut Context, +async fn handle_approved_ancestor>( + sender: &mut Sender, db: &OverlayedBackend<'_, impl Backend>, target: Hash, lower_bound: BlockNumber, @@ -2143,7 +2275,7 @@ async fn handle_approved_ancestor( let target_number = { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::BlockNumber(target, tx)).await; + sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await; match rx.await { Ok(Ok(Some(n))) => n, @@ -2164,12 +2296,13 @@ async fn handle_approved_ancestor( let ancestry = if target_number > lower_bound + 1 { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::Ancestors { - hash: target, - k: (target_number - (lower_bound + 1)) as usize, - response_channel: tx, - }) - .await; + sender + .send_message(ChainApiMessage::Ancestors { + hash: target, + k: (target_number - (lower_bound + 1)) as usize, + response_channel: tx, + }) + .await; match rx.await { Ok(Ok(a)) => a, @@ -3107,9 +3240,8 @@ fn should_trigger_assignment( } } -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn process_wakeup( - ctx: &mut Context, +async fn process_wakeup>( + sender: &mut Sender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -3140,7 +3272,7 @@ async fn process_wakeup( let ExtendedSessionInfo { ref session_info, ref executor_params, .. } = match get_extended_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.block_hash(), block_entry.session(), ) @@ -3282,7 +3414,7 @@ async fn process_wakeup( // Note that this function also schedules a wakeup as necessary. actions.extend( advance_approval_state( - ctx.sender(), + sender, state, db, session_info_provider, @@ -3303,8 +3435,14 @@ async fn process_wakeup( // spawned. When the background work is no longer needed, the `AbortHandle` should be dropped // to cancel the background work and any requests it has spawned. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn launch_approval( - ctx: &mut Context, +async fn launch_approval< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender, +>( + mut sender: Sender, + spawn_handle: Arc, metrics: Metrics, session_index: SessionIndex, candidate: CandidateReceipt, @@ -3355,14 +3493,15 @@ async fn launch_approval( .with_stage(jaeger::Stage::ApprovalChecking); let timer = metrics.time_recover_and_approve(); - ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData( - candidate.clone(), - session_index, - Some(backing_group), - core_index, - a_tx, - )) - .await; + sender + .send_message(AvailabilityRecoveryMessage::RecoverAvailableData( + candidate.clone(), + session_index, + Some(backing_group), + core_index, + a_tx, + )) + .await; let request_validation_result_span = span .child("request-validation-result") @@ -3371,15 +3510,18 @@ async fn launch_approval( .with_string_tag("block-hash", format!("{:?}", block_hash)) .with_stage(jaeger::Stage::ApprovalChecking); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::ValidationCodeByHash(candidate.descriptor.validation_code_hash, code_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::ValidationCodeByHash( + candidate.descriptor.validation_code_hash, + code_tx, + ), + )) + .await; let candidate = candidate.clone(); let metrics_guard = StaleGuard(Some(metrics)); - let mut sender = ctx.sender().clone(); let background = async move { // Force the move of the timer into the background task. let _timer = timer; @@ -3509,14 +3651,19 @@ async fn launch_approval( } }; let (background, remote_handle) = background.remote_handle(); - ctx.spawn("approval-checks", Box::pin(background)).map(move |()| remote_handle) + spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background)); + Ok(remote_handle) } // Issue and import a local approval vote. Should only be invoked after approval checks // have been done. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn issue_approval( - ctx: &mut Context, +async fn issue_approval< + Sender: SubsystemSender, + ADSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut ADSender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -3595,7 +3742,7 @@ async fn issue_approval( let session_info = match get_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.parent_hash(), block_entry.session(), ) @@ -3637,7 +3784,7 @@ async fn issue_approval( ); let actions = advance_approval_state( - ctx.sender(), + sender, state, db, session_info_provider, @@ -3654,7 +3801,8 @@ async fn issue_approval( db, session_info_provider, state, - ctx, + sender, + approval_voting_sender, block_hash, validator_index, metrics, @@ -3673,11 +3821,15 @@ async fn issue_approval( // Create signature for the approved candidates pending signatures #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn maybe_create_signature( +async fn maybe_create_signature< + Sender: SubsystemSender, + ADSender: SubsystemSender, +>( db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, state: &State, - ctx: &mut Context, + sender: &mut Sender, + approval_voting_sender: &mut ADSender, block_hash: Hash, validator_index: ValidatorIndex, metrics: &Metrics, @@ -3696,7 +3848,7 @@ async fn maybe_create_signature( }; let approval_params = state - .get_approval_voting_params_or_default(ctx, block_entry.session(), block_hash) + .get_approval_voting_params_or_default(sender, block_entry.session(), block_hash) .await .unwrap_or_default(); @@ -3716,7 +3868,7 @@ async fn maybe_create_signature( let session_info = match get_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.parent_hash(), block_entry.session(), ) @@ -3797,7 +3949,7 @@ async fn maybe_create_signature( metrics.on_approval_produced(); - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval( + approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval( IndirectSignedApprovalVoteV2 { block_hash: block_entry.block_hash(), candidate_indices: candidates_indices, @@ -3838,7 +3990,7 @@ fn issue_local_invalid_statement( candidate_hash: CandidateHash, candidate: CandidateReceipt, ) where - Sender: overseer::ApprovalVotingSenderTrait, + Sender: SubsystemSender, { // We need to send an unbounded message here to break a cycle: // DisputeCoordinatorMessage::IssueLocalStatement -> diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 7126f209a94f..65aa4f894c23 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -35,11 +35,11 @@ use polkadot_node_subsystem::{ messages::{ AllMessages, ApprovalVotingMessage, AssignmentCheckResult, AvailabilityRecoveryMessage, }, - ActiveLeavesUpdate, + ActiveLeavesUpdate, SubsystemContext, }; use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt; -use polkadot_overseer::HeadSupportsParachains; +use polkadot_overseer::{HeadSupportsParachains, SpawnGlue}; use polkadot_primitives::{ ApprovalVote, CandidateCommitments, CandidateEvent, CoreIndex, DisputeStatement, GroupIndex, Header, Id as ParaId, IndexedVec, NodeFeatures, ValidDisputeStatementKind, ValidationCode, @@ -536,7 +536,7 @@ impl Default for HarnessConfig { struct TestHarness { virtual_overseer: VirtualOverseer, - clock: Box, + clock: Arc, sync_oracle_handle: TestSyncOracleHandle, } @@ -550,8 +550,8 @@ fn test_harness>( config; let pool = sp_core::testing::TaskExecutor::new(); - let (context, virtual_overseer) = - polkadot_node_subsystem_test_helpers::make_subsystem_context(pool); + let (mut context, virtual_overseer) = + polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); let keystore = LocalKeystore::in_memory(); let _ = keystore.sr25519_generate_new( @@ -559,12 +559,14 @@ fn test_harness>( Some(&Sr25519Keyring::Alice.to_seed()), ); - let clock = Box::new(clock); + let clock = Arc::new(clock); let db = kvdb_memorydb::create(test_constants::NUM_COLUMNS); let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]); - + let sender = context.sender().clone(); let subsystem = run( context, + sender.clone(), + sender.clone(), ApprovalVotingSubsystem::with_config_and_clock( Config { col_approval_data: test_constants::TEST_CONFIG.col_approval_data, @@ -575,6 +577,7 @@ fn test_harness>( sync_oracle, Metrics::default(), clock.clone(), + Arc::new(SpawnGlue(pool)), ), assignment_criteria, backend, @@ -4114,7 +4117,7 @@ async fn handle_approval_on_max_coalesce_count( async fn handle_approval_on_max_wait_time( virtual_overseer: &mut VirtualOverseer, candidate_indices: Vec, - clock: Box, + clock: Arc, ) { const TICK_NOW_BEGIN: u64 = 1; const MAX_COALESCE_COUNT: u32 = 3; @@ -4412,7 +4415,7 @@ async fn build_chain_with_two_blocks_with_one_candidate_each( async fn setup_overseer_with_two_blocks_each_with_one_assignment_triggered( virtual_overseer: &mut VirtualOverseer, store: TestStore, - clock: &Box, + clock: &Arc, sync_oracle_handle: TestSyncOracleHandle, ) { assert_matches!( @@ -4926,7 +4929,7 @@ fn test_gathering_assignments_statements() { let mut state = State { keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, - clock: Box::new(MockClock::default()), + clock: Arc::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), per_block_assignments_gathering_times: LruMap::new(ByLength::new( @@ -5021,7 +5024,7 @@ fn test_observe_assignment_gathering_status() { let mut state = State { keystore: Arc::new(LocalKeystore::in_memory()), slot_duration_millis: 6_000, - clock: Box::new(MockClock::default()), + clock: Arc::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), spans: HashMap::new(), per_block_assignments_gathering_times: LruMap::new(ByLength::new( diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index a8e7fc16eb4d..fe96d29c1ceb 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -63,6 +63,7 @@ use { }; use polkadot_node_subsystem_util::database::Database; +use polkadot_overseer::SpawnGlue; #[cfg(feature = "full-node")] pub use { @@ -83,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use prometheus_endpoint::Registry; #[cfg(feature = "full-node")] use sc_service::KeystoreContainer; -use sc_service::RpcHandlers; +use sc_service::{RpcHandlers, SpawnTaskHandle}; use sc_telemetry::TelemetryWorker; #[cfg(feature = "full-node")] use sc_telemetry::{Telemetry, TelemetryWorkerHandle}; @@ -1500,6 +1501,7 @@ pub fn revert_backend( backend: Arc, blocks: BlockNumber, config: Configuration, + task_handle: SpawnTaskHandle, ) -> Result<(), Error> { let best_number = client.info().best_number; let finalized = client.info().finalized_number; @@ -1520,7 +1522,7 @@ pub fn revert_backend( let parachains_db = open_database(&config.database) .map_err(|err| sp_blockchain::Error::Backend(err.to_string()))?; - revert_approval_voting(parachains_db.clone(), hash)?; + revert_approval_voting(parachains_db.clone(), hash, task_handle)?; revert_chain_selection(parachains_db, hash)?; // Revert Substrate consensus related components sc_consensus_babe::revert(client.clone(), backend, blocks)?; @@ -1543,7 +1545,11 @@ fn revert_chain_selection(db: Arc, hash: Hash) -> sp_blockchain::R .map_err(|err| sp_blockchain::Error::Backend(err.to_string())) } -fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::Result<()> { +fn revert_approval_voting( + db: Arc, + hash: Hash, + task_handle: SpawnTaskHandle, +) -> sp_blockchain::Result<()> { let config = approval_voting_subsystem::Config { col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, slot_duration_millis: Default::default(), @@ -1555,6 +1561,7 @@ fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::R Arc::new(sc_keystore::LocalKeystore::in_memory()), Box::new(sp_consensus::NoNetwork), approval_voting_subsystem::Metrics::default(), + Arc::new(SpawnGlue(task_handle)), ); approval_voting diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 0b57ff6e395f..3c071e34fe11 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -320,6 +320,7 @@ where keystore.clone(), Box::new(sync_service.clone()), Metrics::register(registry)?, + Arc::new(spawner.clone()), )) .gossip_support(GossipSupportSubsystem::new( keystore.clone(), diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index f05d061f3fde..9d85039b8880 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -814,7 +814,8 @@ fn build_overseer( Arc::new(keystore), Box::new(TestSyncOracle {}), state.approval_voting_metrics.clone(), - Box::new(system_clock.clone()), + Arc::new(system_clock.clone()), + Arc::new(SpawnGlue(spawn_task_handle.clone())), ); let approval_distribution = ApprovalDistribution::new_with_clock( diff --git a/prdoc/pr_4846.prdoc b/prdoc/pr_4846.prdoc new file mode 100644 index 000000000000..eb18301b1010 --- /dev/null +++ b/prdoc/pr_4846.prdoc @@ -0,0 +1,13 @@ +title: "Make approval-voting runnable on a worker thread" + +doc: + - audience: Node Dev + description: | + Make approval-voting subsystem runnable on a separate worker thread without having to + to always pass to it an orchestra context. It achieves that by refactoring existing functions + to require only the minimal set of traits needed in the function instead of the general + `Context` + +crates: + - name: polkadot-node-core-approval-voting + bump: major