diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index b89054b4dc321..276ee87516858 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -373,16 +373,16 @@ pub fn run() -> Result<()> { Ok(runner.async_run(|mut config| { let (client, backend, _, task_manager) = polkadot_service::new_chain_ops(&mut config, None)?; + let task_handle = task_manager.spawn_handle(); let aux_revert = Box::new(|client, backend, blocks| { - polkadot_service::revert_backend(client, backend, blocks, config).map_err( - |err| { + polkadot_service::revert_backend(client, backend, blocks, config, task_handle) + .map_err(|err| { match err { polkadot_service::Error::Blockchain(err) => err.into(), // Generic application-specific error. err => sc_cli::Error::Application(err.into()), } - }, - ) + }) }); Ok(( cmd.run(client, backend, Some(aux_revert)).map_err(Error::SubstrateCli), diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 202fd7cdc4f5d..51341bae0932f 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -44,6 +44,7 @@ use polkadot_node_subsystem::{ overseer, RuntimeApiError, SubsystemError, SubsystemResult, }; use polkadot_node_subsystem_util::{determine_new_blocks, runtime::RuntimeInfo}; +use polkadot_overseer::SubsystemSender; use polkadot_primitives::{ node_features, BlockNumber, CandidateEvent, CandidateHash, CandidateReceipt, ConsensusLog, CoreIndex, GroupIndex, Hash, Header, SessionIndex, @@ -110,8 +111,8 @@ enum ImportedBlockInfoError { /// Computes information about the imported block. Returns an error if the info couldn't be /// extracted. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn imported_block_info( - ctx: &mut Context, +async fn imported_block_info>( + sender: &mut Sender, env: ImportedBlockInfoEnv<'_>, block_hash: Hash, block_header: &Header, @@ -123,11 +124,12 @@ async fn imported_block_info( // fetch candidates let included_candidates: Vec<_> = { let (c_tx, c_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::CandidateEvents(c_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::CandidateEvents(c_tx), + )) + .await; let events: Vec = match c_rx.await { Ok(Ok(events)) => events, @@ -150,11 +152,12 @@ async fn imported_block_info( // short, that shouldn't happen. let session_index = { let (s_tx, s_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_header.parent_hash, - RuntimeApiRequest::SessionIndexForChild(s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_header.parent_hash, + RuntimeApiRequest::SessionIndexForChild(s_tx), + )) + .await; let session_index = match s_rx.await { Ok(Ok(s)) => s, @@ -200,11 +203,12 @@ async fn imported_block_info( // by one block. This gives us the opposite invariant for sessions - the parent block's // post-state gives us the canonical information about the session index for any of its // children, regardless of which slot number they might be produced at. - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::CurrentBabeEpoch(s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::CurrentBabeEpoch(s_tx), + )) + .await; match s_rx.await { Ok(Ok(s)) => s, @@ -215,7 +219,7 @@ async fn imported_block_info( }; let extended_session_info = - get_extended_session_info(env.runtime_info, ctx.sender(), block_hash, session_index).await; + get_extended_session_info(env.runtime_info, sender, block_hash, session_index).await; let enable_v2_assignments = extended_session_info.map_or(false, |extended_session_info| { *extended_session_info .node_features @@ -224,7 +228,7 @@ async fn imported_block_info( .unwrap_or(&false) }); - let session_info = get_session_info(env.runtime_info, ctx.sender(), block_hash, session_index) + let session_info = get_session_info(env.runtime_info, sender, block_hash, session_index) .await .ok_or(ImportedBlockInfoError::SessionInfoUnavailable)?; @@ -328,9 +332,15 @@ pub struct BlockImportedCandidates { /// * and return information about all candidates imported under each block. /// /// It is the responsibility of the caller to schedule wakeups for each block. -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -pub(crate) async fn handle_new_head( - ctx: &mut Context, +pub(crate) async fn handle_new_head< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender, + AVSender: SubsystemSender, + B: Backend, +>( + sender: &mut Sender, + approval_voting_sender: &mut AVSender, state: &State, db: &mut OverlayedBackend<'_, B>, session_info_provider: &mut RuntimeInfo, @@ -348,7 +358,7 @@ pub(crate) async fn handle_new_head( let header = { let (h_tx, h_rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; + sender.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; match h_rx.await? { Err(e) => { gum::debug!( @@ -374,7 +384,7 @@ pub(crate) async fn handle_new_head( let lower_bound_number = finalized_number.unwrap_or(lower_bound_number).max(lower_bound_number); let new_blocks = determine_new_blocks( - ctx.sender(), + sender, |h| db.load_block_entry(h).map(|e| e.is_some()), head, &header, @@ -400,12 +410,15 @@ pub(crate) async fn handle_new_head( keystore: &state.keystore, }; - match imported_block_info(ctx, env, block_hash, &block_header, finalized_number).await { + match imported_block_info(sender, env, block_hash, &block_header, finalized_number) + .await + { Ok(i) => imported_blocks_and_info.push((block_hash, block_header, i)), Err(error) => { // It's possible that we've lost a race with finality. let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx)) + sender + .send_message(ChainApiMessage::FinalizedBlockHash(block_header.number, tx)) .await; let lost_to_finality = match rx.await { @@ -449,17 +462,11 @@ pub(crate) async fn handle_new_head( force_approve, } = imported_block_info; - let session_info = match get_session_info( - session_info_provider, - ctx.sender(), - head, - session_index, - ) - .await - { - Some(session_info) => session_info, - None => return Ok(Vec::new()), - }; + let session_info = + match get_session_info(session_info_provider, sender, head, session_index).await { + Some(session_info) => session_info, + None => return Ok(Vec::new()), + }; let (block_tick, no_show_duration) = { let block_tick = slot_number_to_tick(state.slot_duration_millis, slot); @@ -509,7 +516,7 @@ pub(crate) async fn handle_new_head( }; // If all bits are already set, then send an approve message. if approved_bitfield.count_ones() == approved_bitfield.len() { - ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; + sender.send_message(ChainSelectionMessage::Approved(block_hash)).await; } let block_entry = v3::BlockEntry { @@ -566,7 +573,7 @@ pub(crate) async fn handle_new_head( // Notify chain-selection of all approved hashes. for hash in approved_hashes { - ctx.send_message(ChainSelectionMessage::Approved(hash)).await; + sender.send_message(ChainSelectionMessage::Approved(hash)).await; } } @@ -603,7 +610,8 @@ pub(crate) async fn handle_new_head( "Informing distribution of newly imported chain", ); - ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); + approval_voting_sender + .send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); Ok(imported_candidates) } diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 20347eaf57e7a..d6208cbf6bac1 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -148,7 +148,7 @@ pub struct Config { // When transitioning from `Syncing` to `Active`, the node notifies the `ApprovalDistribution` // subsystem of all unfinalized blocks and the candidates included within them, as well as all // votes that the local node itself has cast on candidates within those blocks. -enum Mode { +pub enum Mode { Active, Syncing(Box), } @@ -164,7 +164,8 @@ pub struct ApprovalVotingSubsystem { db: Arc, mode: Mode, metrics: Metrics, - clock: Box, + clock: Arc, + spawner: Arc, } #[derive(Clone)] @@ -483,6 +484,7 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, + spawner: Arc, ) -> Self { ApprovalVotingSubsystem::with_config_and_clock( config, @@ -490,7 +492,8 @@ impl ApprovalVotingSubsystem { keystore, sync_oracle, metrics, - Box::new(SystemClock {}), + Arc::new(SystemClock {}), + spawner, ) } @@ -501,7 +504,8 @@ impl ApprovalVotingSubsystem { keystore: Arc, sync_oracle: Box, metrics: Metrics, - clock: Box, + clock: Arc, + spawner: Arc, ) -> Self { ApprovalVotingSubsystem { keystore, @@ -511,6 +515,7 @@ impl ApprovalVotingSubsystem { mode: Mode::Syncing(sync_oracle), metrics, clock, + spawner, } } @@ -824,7 +829,7 @@ where struct State { keystore: Arc, slot_duration_millis: u64, - clock: Box, + clock: Arc, assignment_criteria: Box, spans: HashMap, // Per block, candidate records about how long we take until we gather enough @@ -960,20 +965,20 @@ impl State { } // Returns the approval voting params from the RuntimeApi. - #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] - async fn get_approval_voting_params_or_default( + async fn get_approval_voting_params_or_default>( &self, - ctx: &mut Context, + sender: &mut Sender, session_index: SessionIndex, block_hash: Hash, ) -> Option { let (s_tx, s_rx) = oneshot::channel(); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::ApprovalVotingParams(session_index, s_tx), + )) + .await; match s_rx.await { Ok(Ok(params)) => { @@ -1168,19 +1173,203 @@ where no_show_stats: NoShowStats::default(), }; + let mut last_finalized_height: Option = { + let (tx, rx) = oneshot::channel(); + ctx.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; + match rx.await? { + Ok(number) => Some(number), + Err(err) => { + gum::warn!(target: LOG_TARGET, ?err, "Failed fetching finalized number"); + None + }, + } + }; + // `None` on start-up. Gets initialized/updated on leaf update let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { keystore: None, session_cache_lru_size: DISPUTE_WINDOW.get(), }); + let mut wakeups = Wakeups::default(); let mut currently_checking_set = CurrentlyCheckingSet::default(); let mut delayed_approvals_timers = DelayedApprovalTimer::default(); let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE)); + let mut approval_voting_sender = ctx.sender().clone(); + loop { + let mut overlayed_db = OverlayedBackend::new(&backend); + let actions = futures::select! { + (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { + subsystem.metrics.on_wakeup(); + process_wakeup( + ctx.sender(), + &mut state, + &mut overlayed_db, + &mut session_info_provider, + woken_block, + woken_candidate, + &subsystem.metrics, + &wakeups, + ).await? + } + next_msg = ctx.recv().fuse() => { + let mut actions = handle_from_overseer( + ctx.sender(), + &mut approval_voting_sender, + &subsystem.spawner, + &mut state, + &mut overlayed_db, + &mut session_info_provider, + &subsystem.metrics, + next_msg?, + &mut last_finalized_height, + &mut wakeups, + ).await?; + + if let Mode::Syncing(ref mut oracle) = subsystem.mode { + if !oracle.is_major_syncing() { + // note that we're active before processing other actions. + actions.insert(0, Action::BecomeActive) + } + } + + actions + } + approval_state = currently_checking_set.next(&mut approvals_cache).fuse() => { + let mut actions = Vec::new(); + let ( + relay_block_hashes, + ApprovalState { + validator_index, + candidate_hash, + approval_outcome, + } + ) = approval_state; + + if matches!(approval_outcome, ApprovalOutcome::Approved) { + let mut approvals: Vec = relay_block_hashes + .into_iter() + .map(|block_hash| + Action::IssueApproval( + candidate_hash, + ApprovalVoteRequest { + validator_index, + block_hash, + }, + ) + ) + .collect(); + actions.append(&mut approvals); + } + + actions + }, + (block_hash, validator_index) = delayed_approvals_timers.select_next_some() => { + gum::debug!( + target: LOG_TARGET, + ?block_hash, + ?validator_index, + "Sign approval for multiple candidates", + ); + + match maybe_create_signature( + &mut overlayed_db, + &mut session_info_provider, + &state, + ctx.sender(), + &mut approval_voting_sender, + block_hash, + validator_index, + &subsystem.metrics, + ).await { + Ok(Some(next_wakeup)) => { + delayed_approvals_timers.maybe_arm_timer(next_wakeup, state.clock.as_ref(), block_hash, validator_index); + }, + Ok(None) => {} + Err(err) => { + gum::error!( + target: LOG_TARGET, + ?err, + "Failed to create signature", + ); + } + } + vec![] + } + }; + + if handle_actions( + ctx.sender(), + &mut approval_voting_sender, + &subsystem.spawner, + &mut state, + &mut overlayed_db, + &mut session_info_provider, + &subsystem.metrics, + &mut wakeups, + &mut currently_checking_set, + &mut delayed_approvals_timers, + &mut approvals_cache, + &mut subsystem.mode, + actions, + ) + .await? + { + break + } + + if !overlayed_db.is_empty() { + let _timer = subsystem.metrics.time_db_transaction(); + let ops = overlayed_db.into_write_ops(); + backend.write(ops)?; + } + } + + Ok(()) +} + +// TODO: This is a copy of the `fn run` because I did not find a way to decouple the whole logic +// from `ctx.recv()`. +async fn run_approval_on_worker_thread< + B, + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + AVSender: SubsystemSender, +>( + mut approval_work: futures::channel::mpsc::Receiver>, + mut sender: Sender, + mut approval_voting_sender: AVSender, + mut subsystem: ApprovalVotingSubsystem, + assignment_criteria: Box, + mut backend: B, +) -> SubsystemResult<()> +where + B: Backend, +{ + if let Err(err) = db_sanity_check(subsystem.db.clone(), subsystem.db_config) { + gum::warn!(target: LOG_TARGET, ?err, "Could not run approval vote DB sanity check"); + } + + let mut state = State { + keystore: subsystem.keystore, + slot_duration_millis: subsystem.slot_duration_millis, + clock: subsystem.clock, + assignment_criteria, + spans: HashMap::new(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), + no_show_stats: NoShowStats::default(), + }; let mut last_finalized_height: Option = { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; + sender.send_message(ChainApiMessage::FinalizedBlockNumber(tx)).await; match rx.await? { Ok(number) => Some(number), Err(err) => { @@ -1190,13 +1379,23 @@ where } }; + // `None` on start-up. Gets initialized/updated on leaf update + let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig { + keystore: None, + session_cache_lru_size: DISPUTE_WINDOW.get(), + }); + + let mut wakeups = Wakeups::default(); + let mut currently_checking_set = CurrentlyCheckingSet::default(); + let mut delayed_approvals_timers = DelayedApprovalTimer::default(); + let mut approvals_cache = LruMap::new(ByLength::new(APPROVAL_CACHE_SIZE)); loop { let mut overlayed_db = OverlayedBackend::new(&backend); let actions = futures::select! { (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { subsystem.metrics.on_wakeup(); process_wakeup( - &mut ctx, + &mut sender, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1206,14 +1405,16 @@ where &wakeups, ).await? } - next_msg = ctx.recv().fuse() => { + next_msg = approval_work.select_next_some().fuse() => { let mut actions = handle_from_overseer( - &mut ctx, + &mut sender, + &mut approval_voting_sender, + &subsystem.spawner, &mut state, &mut overlayed_db, &mut session_info_provider, &subsystem.metrics, - next_msg?, + next_msg, &mut last_finalized_height, &mut wakeups, ).await?; @@ -1268,7 +1469,8 @@ where &mut overlayed_db, &mut session_info_provider, &state, - &mut ctx, + &mut sender, + &mut approval_voting_sender, block_hash, validator_index, &subsystem.metrics, @@ -1290,7 +1492,9 @@ where }; if handle_actions( - &mut ctx, + &mut sender, + &mut approval_voting_sender, + &subsystem.spawner, &mut state, &mut overlayed_db, &mut session_info_provider, @@ -1317,6 +1521,61 @@ where Ok(()) } +pub async fn start_approval_worker< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + AVSender: SubsystemSender, +>( + approval_work: futures::channel::mpsc::Receiver>, + sender: Sender, + approval_voting_sender: AVSender, + config: Config, + db: Arc, + keystore: Arc, + mode: Mode, + metrics: Metrics, + spawner: Arc, + clock: Arc, +) -> SubsystemResult<()> { + let sync_oracle = match mode { + Mode::Active => todo!(), + Mode::Syncing(oracle) => oracle, + }; + let approval_voting = ApprovalVotingSubsystem::with_config_and_clock( + config, + db.clone(), + keystore, + sync_oracle, + metrics, + clock, + spawner, + ); + let backend = DbBackend::new(db.clone(), approval_voting.db_config); + let spawner = approval_voting.spawner.clone(); + spawner.spawn_blocking( + "approval-voting-rewrite-db", + Some("approval-voting-rewrite-subsystem"), + Box::pin(async move { + run_approval_on_worker_thread( + approval_work, + sender, + approval_voting_sender, + approval_voting, + Box::new(RealAssignmentCriteria), + backend, + ) + .await + .unwrap(); + }), + ); + Ok(()) +} + // Handle actions is a function that accepts a set of instructions // and subsequently updates the underlying approvals_db in accordance // with the linear set of instructions passed in. Therefore, actions @@ -1337,8 +1596,19 @@ where // // returns `true` if any of the actions was a `Conclude` command. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_actions( - ctx: &mut Context, +async fn handle_actions< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + AVSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut AVSender, + spawn_handle: &Arc, state: &mut State, overlayed_db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -1370,7 +1640,8 @@ async fn handle_actions( // Note that chaining these iterators is O(n) as we must consume // the prior iterator. let next_actions: Vec = issue_approval( - ctx, + sender, + approval_voting_sender, state, overlayed_db, session_info_provider, @@ -1421,10 +1692,12 @@ async fn handle_actions( let validator_index = indirect_cert.validator; if distribute_assignment { - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( - indirect_cert, - claimed_candidate_indices, - )); + approval_voting_sender.send_unbounded_message( + ApprovalDistributionMessage::DistributeAssignment( + indirect_cert, + claimed_candidate_indices, + ), + ); } match approvals_cache.get(&candidate_hash) { @@ -1439,7 +1712,8 @@ async fn handle_actions( actions_iter = new_actions.into_iter(); }, None => { - let ctx = &mut *ctx; + let sender = sender.clone(); + let spawn_handle = spawn_handle.clone(); currently_checking_set .insert_relay_block_hash( @@ -1448,7 +1722,8 @@ async fn handle_actions( relay_block_hash, async move { launch_approval( - ctx, + sender, + spawn_handle, metrics.clone(), session, candidate, @@ -1477,13 +1752,13 @@ async fn handle_actions( }) .with_string_tag("block-hash", format!("{:?}", block_hash)) .with_stage(jaeger::Stage::ApprovalChecking); - ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; + sender.send_message(ChainSelectionMessage::Approved(block_hash)).await; }, Action::BecomeActive => { *mode = Mode::Active; let (messages, next_actions) = distribution_messages_for_activation( - ctx, + sender, overlayed_db, state, delayed_approvals_timers, @@ -1491,7 +1766,7 @@ async fn handle_actions( ) .await?; - ctx.send_messages(messages.into_iter()).await; + approval_voting_sender.send_messages(messages.into_iter()).await; let next_actions: Vec = next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); @@ -1565,8 +1840,8 @@ fn get_assignment_core_indices( } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn distribution_messages_for_activation( - ctx: &mut Context, +async fn distribution_messages_for_activation>( + sender: &mut Sender, db: &OverlayedBackend<'_, impl Backend>, state: &State, delayed_approvals_timers: &mut DelayedApprovalTimer, @@ -1696,7 +1971,7 @@ async fn distribution_messages_for_activation( let ExtendedSessionInfo { ref executor_params, .. } = match get_extended_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.block_hash(), block_entry.session(), ) @@ -1794,9 +2069,16 @@ async fn distribution_messages_for_activation( } // Handle an incoming signal from the overseer. Returns true if execution should conclude. -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_from_overseer( - ctx: &mut Context, +async fn handle_from_overseer< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + Clone, + AVSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut AVSender, + spawn_handle: &Arc, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -1814,7 +2096,8 @@ async fn handle_from_overseer( jaeger::PerLeafSpan::new(activated.span, "approval-voting"); state.spans.insert(head, approval_voting_span); match import::handle_new_head( - ctx, + sender, + approval_voting_sender, state, db, session_info_provider, @@ -1898,7 +2181,7 @@ async fn handle_from_overseer( FromOrchestra::Communication { msg } => match msg { ApprovalVotingMessage::CheckAndImportAssignment(a, claimed_cores, tranche) => { let (check_outcome, actions) = check_and_import_assignment( - ctx.sender(), + sender, state, db, session_info_provider, @@ -1914,7 +2197,7 @@ async fn handle_from_overseer( }, ApprovalVotingMessage::CheckAndImportApproval(a) => { let result = check_and_import_approval( - ctx.sender(), + sender, state, db, session_info_provider, @@ -1937,7 +2220,7 @@ async fn handle_from_overseer( .with_stage(jaeger::Stage::ApprovalChecking) .with_string_tag("leaf", format!("{:?}", target)); match handle_approved_ancestor( - ctx, + sender, db, target, lower_bound, @@ -1960,7 +2243,14 @@ async fn handle_from_overseer( }, ApprovalVotingMessage::GetApprovalSignaturesForCandidate(candidate_hash, tx) => { metrics.on_candidate_signatures_request(); - get_approval_signatures_for_candidate(ctx, db, candidate_hash, tx).await?; + get_approval_signatures_for_candidate( + approval_voting_sender.clone(), + spawn_handle, + db, + candidate_hash, + tx, + ) + .await?; Vec::new() }, }, @@ -1974,8 +2264,11 @@ async fn handle_from_overseer( /// This involves an unbounded message send to approval-distribution, the caller has to ensure that /// calls to this function are infrequent and bounded. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn get_approval_signatures_for_candidate( - ctx: &mut Context, +async fn get_approval_signatures_for_candidate< + Sender: SubsystemSender, +>( + mut sender: Sender, + spawn_handle: &Arc, db: &OverlayedBackend<'_, impl Backend>, candidate_hash: CandidateHash, tx: oneshot::Sender, ValidatorSignature)>>, @@ -2034,7 +2327,6 @@ async fn get_approval_signatures_for_candidate( } } - let mut sender = ctx.sender().clone(); let get_approvals = async move { let (tx_distribution, rx_distribution) = oneshot::channel(); sender.send_unbounded_message(ApprovalDistributionMessage::GetApprovalSignatures( @@ -2114,12 +2406,17 @@ async fn get_approval_signatures_for_candidate( ?candidate_hash, "Spawning task for fetching signatures from approval-distribution" ); - ctx.spawn("get-approval-signatures", Box::pin(get_approvals)) + spawn_handle.spawn( + "get-approval-signatures", + Some("approval-voting-subsystem"), + Box::pin(get_approvals), + ); + Ok(()) } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn handle_approved_ancestor( - ctx: &mut Context, +async fn handle_approved_ancestor>( + sender: &mut Sender, db: &OverlayedBackend<'_, impl Backend>, target: Hash, lower_bound: BlockNumber, @@ -2139,7 +2436,7 @@ async fn handle_approved_ancestor( let target_number = { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::BlockNumber(target, tx)).await; + sender.send_message(ChainApiMessage::BlockNumber(target, tx)).await; match rx.await { Ok(Ok(Some(n))) => n, @@ -2160,12 +2457,13 @@ async fn handle_approved_ancestor( let ancestry = if target_number > lower_bound + 1 { let (tx, rx) = oneshot::channel(); - ctx.send_message(ChainApiMessage::Ancestors { - hash: target, - k: (target_number - (lower_bound + 1)) as usize, - response_channel: tx, - }) - .await; + sender + .send_message(ChainApiMessage::Ancestors { + hash: target, + k: (target_number - (lower_bound + 1)) as usize, + response_channel: tx, + }) + .await; match rx.await { Ok(Ok(a)) => a, @@ -3108,9 +3406,8 @@ fn should_trigger_assignment( } } -#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn process_wakeup( - ctx: &mut Context, +async fn process_wakeup>( + sender: &mut Sender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -3141,7 +3438,7 @@ async fn process_wakeup( let ExtendedSessionInfo { ref session_info, ref executor_params, .. } = match get_extended_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.block_hash(), block_entry.session(), ) @@ -3283,7 +3580,7 @@ async fn process_wakeup( // Note that this function also schedules a wakeup as necessary. actions.extend( advance_approval_state( - ctx.sender(), + sender, state, db, session_info_provider, @@ -3304,8 +3601,14 @@ async fn process_wakeup( // spawned. When the background work is no longer needed, the `AbortHandle` should be dropped // to cancel the background work and any requests it has spawned. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn launch_approval( - ctx: &mut Context, +async fn launch_approval< + Sender: SubsystemSender + + SubsystemSender + + SubsystemSender + + SubsystemSender, +>( + mut sender: Sender, + spawn_handle: Arc, metrics: Metrics, session_index: SessionIndex, candidate: CandidateReceipt, @@ -3356,14 +3659,15 @@ async fn launch_approval( .with_stage(jaeger::Stage::ApprovalChecking); let timer = metrics.time_recover_and_approve(); - ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData( - candidate.clone(), - session_index, - Some(backing_group), - core_index, - a_tx, - )) - .await; + sender + .send_message(AvailabilityRecoveryMessage::RecoverAvailableData( + candidate.clone(), + session_index, + Some(backing_group), + core_index, + a_tx, + )) + .await; let request_validation_result_span = span .child("request-validation-result") @@ -3372,15 +3676,18 @@ async fn launch_approval( .with_string_tag("block-hash", format!("{:?}", block_hash)) .with_stage(jaeger::Stage::ApprovalChecking); - ctx.send_message(RuntimeApiMessage::Request( - block_hash, - RuntimeApiRequest::ValidationCodeByHash(candidate.descriptor.validation_code_hash, code_tx), - )) - .await; + sender + .send_message(RuntimeApiMessage::Request( + block_hash, + RuntimeApiRequest::ValidationCodeByHash( + candidate.descriptor.validation_code_hash, + code_tx, + ), + )) + .await; let candidate = candidate.clone(); let metrics_guard = StaleGuard(Some(metrics)); - let mut sender = ctx.sender().clone(); let background = async move { // Force the move of the timer into the background task. let _timer = timer; @@ -3510,14 +3817,19 @@ async fn launch_approval( } }; let (background, remote_handle) = background.remote_handle(); - ctx.spawn("approval-checks", Box::pin(background)).map(move |()| remote_handle) + spawn_handle.spawn("approval-checks", Some("approval-voting-subsystem"), Box::pin(background)); + Ok(remote_handle) } // Issue and import a local approval vote. Should only be invoked after approval checks // have been done. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn issue_approval( - ctx: &mut Context, +async fn issue_approval< + Sender: SubsystemSender, + AVSender: SubsystemSender, +>( + sender: &mut Sender, + approval_voting_sender: &mut AVSender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, @@ -3596,7 +3908,7 @@ async fn issue_approval( let session_info = match get_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.parent_hash(), block_entry.session(), ) @@ -3638,7 +3950,7 @@ async fn issue_approval( ); let actions = advance_approval_state( - ctx.sender(), + sender, state, db, session_info_provider, @@ -3655,7 +3967,8 @@ async fn issue_approval( db, session_info_provider, state, - ctx, + sender, + approval_voting_sender, block_hash, validator_index, metrics, @@ -3674,11 +3987,15 @@ async fn issue_approval( // Create signature for the approved candidates pending signatures #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] -async fn maybe_create_signature( +async fn maybe_create_signature< + Sender: SubsystemSender, + AVSender: SubsystemSender, +>( db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, state: &State, - ctx: &mut Context, + sender: &mut Sender, + approval_voting_sender: &mut AVSender, block_hash: Hash, validator_index: ValidatorIndex, metrics: &Metrics, @@ -3697,7 +4014,7 @@ async fn maybe_create_signature( }; let approval_params = state - .get_approval_voting_params_or_default(ctx, block_entry.session(), block_hash) + .get_approval_voting_params_or_default(sender, block_entry.session(), block_hash) .await .unwrap_or_default(); @@ -3717,7 +4034,7 @@ async fn maybe_create_signature( let session_info = match get_session_info( session_info_provider, - ctx.sender(), + sender, block_entry.parent_hash(), block_entry.session(), ) @@ -3798,7 +4115,7 @@ async fn maybe_create_signature( metrics.on_approval_produced(); - ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval( + approval_voting_sender.send_unbounded_message(ApprovalDistributionMessage::DistributeApproval( IndirectSignedApprovalVoteV2 { block_hash: block_entry.block_hash(), candidate_indices: candidates_indices, @@ -3839,7 +4156,7 @@ fn issue_local_invalid_statement( candidate_hash: CandidateHash, candidate: CandidateReceipt, ) where - Sender: overseer::ApprovalVotingSenderTrait, + Sender: SubsystemSender, { // We need to send an unbounded message here to break a cycle: // DisputeCoordinatorMessage::IssueLocalStatement -> diff --git a/polkadot/node/service/src/lib.rs b/polkadot/node/service/src/lib.rs index b4f63bd2aa06b..b76f40dd31029 100644 --- a/polkadot/node/service/src/lib.rs +++ b/polkadot/node/service/src/lib.rs @@ -63,6 +63,7 @@ use { }; use polkadot_node_subsystem_util::database::Database; +use polkadot_overseer::SpawnGlue; #[cfg(feature = "full-node")] pub use { @@ -83,7 +84,7 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc, time::Duration}; use prometheus_endpoint::Registry; #[cfg(feature = "full-node")] use sc_service::KeystoreContainer; -use sc_service::RpcHandlers; +use sc_service::{RpcHandlers, SpawnTaskHandle}; use sc_telemetry::TelemetryWorker; #[cfg(feature = "full-node")] use sc_telemetry::{Telemetry, TelemetryWorkerHandle}; @@ -1481,6 +1482,7 @@ pub fn revert_backend( backend: Arc, blocks: BlockNumber, config: Configuration, + task_handle: SpawnTaskHandle, ) -> Result<(), Error> { let best_number = client.info().best_number; let finalized = client.info().finalized_number; @@ -1501,7 +1503,7 @@ pub fn revert_backend( let parachains_db = open_database(&config.database) .map_err(|err| sp_blockchain::Error::Backend(err.to_string()))?; - revert_approval_voting(parachains_db.clone(), hash)?; + revert_approval_voting(parachains_db.clone(), hash, task_handle)?; revert_chain_selection(parachains_db, hash)?; // Revert Substrate consensus related components sc_consensus_babe::revert(client.clone(), backend, blocks)?; @@ -1524,7 +1526,11 @@ fn revert_chain_selection(db: Arc, hash: Hash) -> sp_blockchain::R .map_err(|err| sp_blockchain::Error::Backend(err.to_string())) } -fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::Result<()> { +fn revert_approval_voting( + db: Arc, + hash: Hash, + task_handle: SpawnTaskHandle, +) -> sp_blockchain::Result<()> { let config = approval_voting_subsystem::Config { col_approval_data: parachains_db::REAL_COLUMNS.col_approval_data, slot_duration_millis: Default::default(), @@ -1536,6 +1542,7 @@ fn revert_approval_voting(db: Arc, hash: Hash) -> sp_blockchain::R Arc::new(sc_keystore::LocalKeystore::in_memory()), Box::new(sp_consensus::NoNetwork), approval_voting_subsystem::Metrics::default(), + Arc::new(SpawnGlue(task_handle)), ); approval_voting diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index eedc92572e08c..1a14b3f85cb19 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -318,6 +318,7 @@ where keystore.clone(), Box::new(sync_service.clone()), Metrics::register(registry)?, + Arc::new(spawner.clone()), )) .gossip_support(GossipSupportSubsystem::new( keystore.clone(),