From 5dbe20b4807b1db18d0283d4b79ba9c20a549435 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 19 Sep 2023 02:18:00 +0300 Subject: [PATCH 1/3] Leverage `SegmentHeadersStore` to find last archived block faster on node restart, improve error handling by removing incorrect `.expect()` calls --- crates/sc-consensus-subspace/src/archiver.rs | 204 ++++++++----------- crates/subspace-service/src/lib.rs | 13 +- 2 files changed, 91 insertions(+), 126 deletions(-) diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 4e74a6ac37..225c14aca3 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -25,7 +25,7 @@ use crate::{ }; use codec::{Decode, Encode}; use futures::StreamExt; -use log::{debug, error, info, warn}; +use log::{debug, info, warn}; use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; @@ -78,7 +78,7 @@ where const INITIAL_CACHE_CAPACITY: usize = 1_000; /// Create new instance - pub fn new(aux_store: Arc) -> Result { + pub fn new(aux_store: Arc) -> sp_blockchain::Result { let mut cache = Vec::with_capacity(Self::INITIAL_CACHE_CAPACITY); let mut next_key_index = 0; @@ -121,7 +121,7 @@ where pub fn add_segment_headers( &self, segment_headers: &[SegmentHeader], - ) -> Result<(), sp_blockchain::Error> { + ) -> sp_blockchain::Result<()> { let mut maybe_last_segment_index = self.max_segment_index(); let mut segment_headers_to_store = Vec::with_capacity(segment_headers.len()); for segment_header in segment_headers { @@ -201,88 +201,63 @@ where /// https://github.com/paritytech/substrate/discussions/14359 pub(crate) const FINALIZATION_DEPTH_IN_SEGMENTS: usize = 5; -fn find_last_archived_block( +fn find_last_archived_block( client: &Client, - best_block_hash: Block::Hash, -) -> Option<(SegmentHeader, Block, BlockObjectMapping)> + segment_headers_store: &SegmentHeadersStore, +) -> sp_blockchain::Result> where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend, Client::Api: SubspaceApi + ObjectsApi, + AS: AuxStore, { - let mut block_to_check = best_block_hash; - let last_segment_header = 'outer: loop { - let block = client - .block(block_to_check) - .expect("Older blocks should always exist") - .expect("Older blocks should always exist"); - - for extrinsic in block.block.extrinsics() { - match client - .runtime_api() - .extract_segment_headers(block_to_check, extrinsic) - { - Ok(Some(segment_headers)) => { - break 'outer segment_headers.into_iter().last()?; - } - Ok(None) => { - // Some other extrinsic, ignore - } - Err(error) => { - // TODO: Probably light client, can this even happen? - panic!( - "Failed to make runtime API call during last archived block search: \ - {error:?}" - ); - } - } - } - - let parent_block_hash = *block.block.header().parent_hash(); - - if parent_block_hash == Block::Hash::default() { - // Genesis block, nothing else to check - return None; - } - - block_to_check = parent_block_hash; + let Some(max_segment_index) = segment_headers_store.max_segment_index() else { + return Ok(None); }; - let last_archived_block_number = last_segment_header.last_archived_block().number; + if max_segment_index == SegmentIndex::ZERO { + // Just genesis, nothing else to check + return Ok(None); + } - let last_archived_block = loop { - let block = client - .block(block_to_check) - .expect("Older blocks must always exist") - .expect("Older blocks must always exist") - .block; + for segment_header in (SegmentIndex::ZERO..=max_segment_index) + .rev() + .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index)) + { + let last_archived_block_number = segment_header.last_archived_block().number; + let Some(last_archived_block_hash) = client.hash(last_archived_block_number.into())? else { + // This block number is not in our chain yet (segment headers store may know about more + // blocks in existence than is currently imported) + continue; + }; - if *block.header().number() == last_archived_block_number.into() { - break block; - } + let last_segment_header = segment_header; - block_to_check = *block.header().parent_hash(); - }; + let last_archived_block = client + .block(last_archived_block_hash)? + .expect("Last archived block must always be retrievable; qed") + .block; - let last_archived_block_hash = block_to_check; + let block_object_mappings = client + .runtime_api() + .validated_object_call_hashes(last_archived_block_hash) + .and_then(|calls| { + client.runtime_api().extract_block_object_mapping( + *last_archived_block.header().parent_hash(), + last_archived_block.clone(), + calls, + ) + }) + .unwrap_or_default(); - let block_object_mappings = client - .runtime_api() - .validated_object_call_hashes(last_archived_block_hash) - .and_then(|calls| { - client.runtime_api().extract_block_object_mapping( - *last_archived_block.header().parent_hash(), - last_archived_block.clone(), - calls, - ) - }) - .unwrap_or_default(); + return Ok(Some(( + last_segment_header, + last_archived_block, + block_object_mappings, + ))); + } - Some(( - last_segment_header, - last_archived_block, - block_object_mappings, - )) + Ok(None) } struct BlockHashesToArchive @@ -298,7 +273,7 @@ fn block_hashes_to_archive( best_block_hash: Block::Hash, blocks_to_archive_from: NumberFor, blocks_to_archive_to: NumberFor, -) -> BlockHashesToArchive +) -> sp_blockchain::Result> where Block: BlockT, Client: HeaderBackend, @@ -311,8 +286,7 @@ where loop { // TODO: `Error` here must be handled instead let header = client - .header(block_hash_to_check) - .expect("Parent block must exist; qed") + .header(block_hash_to_check)? .expect("Parent block must exist; qed"); if block_range.contains(header.number()) { @@ -330,10 +304,10 @@ where block_hash_to_check = *header.parent_hash(); } - BlockHashesToArchive { + Ok(BlockHashesToArchive { block_hashes, best_archived, - } + }) } /// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned @@ -419,7 +393,7 @@ fn initialize_archiver( segment_headers_store: &SegmentHeadersStore, subspace_link: &SubspaceLink, client: &Client, -) -> InitializedArchiver +) -> sp_blockchain::Result> where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend + AuxStore, @@ -430,7 +404,7 @@ where .expect("Must always be able to get chain constants") .confirmation_depth_k(); - let maybe_last_archived_block = find_last_archived_block(client, best_block_hash); + let maybe_last_archived_block = find_last_archived_block(client, segment_headers_store)?; let have_last_segment_header = maybe_last_archived_block.is_some(); let mut best_archived_block = None; @@ -510,14 +484,13 @@ where best_block_hash, blocks_to_archive_from.into(), blocks_to_archive_to.into(), - ); + )?; best_archived_block = block_hashes_to_archive.best_archived; let block_hashes_to_archive = block_hashes_to_archive.block_hashes; for block_hash_to_archive in block_hashes_to_archive.into_iter().rev() { let block = client - .block(block_hash_to_archive) - .expect("Older block by number must always exist") + .block(block_hash_to_archive)? .expect("Older block by number must always exist") .block; let block_number_to_archive = *block.header().number(); @@ -556,11 +529,7 @@ where older_archived_segments.extend(archived_segments); if !new_segment_headers.is_empty() { - if let Err(error) = - segment_headers_store.add_segment_headers(&new_segment_headers) - { - panic!("Failed to store segment headers: {error}"); - } + segment_headers_store.add_segment_headers(&new_segment_headers)?; // Set list of expected segment headers for the block where we expect segment // header extrinsic to be included subspace_link.segment_headers.lock().put( @@ -578,13 +547,13 @@ where } } - InitializedArchiver { + Ok(InitializedArchiver { confirmation_depth_k, archiver, older_archived_segments, best_archived_block: best_archived_block .expect("Must always set if there is no logical error; qed"), - } + }) } fn finalize_block( @@ -603,7 +572,7 @@ fn finalize_block( } // We don't have anything useful to do with this result yet, the only source of errors was // logged already inside - let _result: Result<_, sp_blockchain::Error> = client.lock_import_and_run(|import_op| { + let _result: sp_blockchain::Result<_> = client.lock_import_and_run(|import_op| { // Ideally some handle to a synchronization oracle would be used to avoid unconditionally // notifying. client @@ -637,7 +606,7 @@ pub fn create_subspace_archiver( client: Arc, sync_oracle: SubspaceSyncOracle, telemetry: Option, -) -> impl Future + Send + 'static +) -> sp_blockchain::Result> + Send + 'static> where Block: BlockT, Backend: BackendT, @@ -669,7 +638,7 @@ where &segment_headers_store, subspace_link, client.as_ref(), - ); + )?; let mut block_importing_notification_stream = subspace_link .block_importing_notification_stream @@ -678,7 +647,7 @@ where subspace_link.archived_segment_notification_sender.clone(); let segment_headers = Arc::clone(&subspace_link.segment_headers); - async move { + Ok(async move { // Farmers may have not received all previous segments, send them now. for archived_segment in older_archived_segments { send_archived_segment_notification( @@ -714,11 +683,9 @@ where let block = client .block( client - .hash(block_number_to_archive) - .expect("Older block by number must always exist") + .hash(block_number_to_archive)? .expect("Older block by number must always exist"), - ) - .expect("Older block by number must always exist") + )? .expect("Older block by number must always exist") .block; @@ -733,19 +700,19 @@ where ); if parent_block_hash != best_archived_block_hash { - error!( - target: "subspace", + let error = format!( "Attempt to switch to a different fork beyond archiving depth, \ can't do it: parent block hash {}, best archived block hash {}", - parent_block_hash, - best_archived_block_hash + parent_block_hash, best_archived_block_hash ); - return; + return Err(sp_blockchain::Error::Consensus(sp_consensus::Error::Other( + error.into(), + ))); } best_archived_block_hash = block_hash_to_archive; - let block_object_mappings = match client + let block_object_mappings = client .runtime_api() .validated_object_call_hashes(block_hash_to_archive) .and_then(|calls| { @@ -754,16 +721,12 @@ where block.clone(), calls, ) - }) { - Ok(block_object_mappings) => block_object_mappings, - Err(error) => { - error!( - target: "subspace", - "Failed to retrieve block object mappings: {error}" - ); - return; - } - }; + }) + .map_err(|error| { + sp_blockchain::Error::Application( + format!("Failed to retrieve block object mappings: {error}").into(), + ) + })?; let encoded_block = block.encode(); debug!( @@ -781,15 +744,7 @@ where ) { let segment_header = archived_segment.segment_header; - if let Err(error) = - segment_headers_store.add_segment_headers(slice::from_ref(&segment_header)) - { - error!( - target: "subspace", - "Failed to store segment headers: {error}" - ); - return; - } + segment_headers_store.add_segment_headers(slice::from_ref(&segment_header))?; send_archived_segment_notification( &archived_segment_notification_sender, @@ -815,8 +770,7 @@ where if let Some(block_number_to_finalize) = maybe_block_number_to_finalize { let block_hash_to_finalize = client - .hash(block_number_to_finalize.into()) - .expect("Block about to be finalized must always exist") + .hash(block_number_to_finalize.into())? .expect("Block about to be finalized must always exist"); finalize_block( client.as_ref(), @@ -827,7 +781,9 @@ where } } } - } + + Ok(()) + }) } async fn send_archived_segment_notification( diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 819f24832e..a62065b49c 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -698,11 +698,20 @@ where client.clone(), subspace_sync_oracle.clone(), telemetry.as_ref().map(|telemetry| telemetry.handle()), - ); + ) + .map_err(ServiceError::Client)?; task_manager .spawn_essential_handle() - .spawn_essential_blocking("subspace-archiver", None, Box::pin(subspace_archiver)); + .spawn_essential_blocking( + "subspace-archiver", + None, + Box::pin(async move { + if let Err(error) = subspace_archiver.await { + error!(%error, "Archiver exited with error"); + } + }), + ); if config.enable_subspace_block_relay { network_wrapper.set(network_service.clone()); From 1d1030c710830f9ed66f4926bd78603e653f538b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 19 Sep 2023 02:45:31 +0300 Subject: [PATCH 2/3] Simplify and accelerate mapping of block numbers to hashes during archiver restart --- Cargo.lock | 1 + crates/sc-consensus-subspace/Cargo.toml | 1 + crates/sc-consensus-subspace/src/archiver.rs | 75 ++++---------------- 3 files changed, 16 insertions(+), 61 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2afa8ab125..2e89f52022 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8868,6 +8868,7 @@ dependencies = [ "parking_lot 0.12.1", "rand 0.8.5", "rand_chacha 0.3.1", + "rayon", "sc-client-api", "sc-consensus", "sc-consensus-slots", diff --git a/crates/sc-consensus-subspace/Cargo.toml b/crates/sc-consensus-subspace/Cargo.toml index f21cf0ab53..e1e5eba289 100644 --- a/crates/sc-consensus-subspace/Cargo.toml +++ b/crates/sc-consensus-subspace/Cargo.toml @@ -25,6 +25,7 @@ parking_lot = "0.12.1" prometheus-endpoint = { package = "substrate-prometheus-endpoint", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71", version = "0.10.0-dev" } rand = "0.8.5" rand_chacha = "0.3.1" +rayon = "1.7.0" schnorrkel = "0.9.1" sc-client-api = { version = "4.0.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } sc-consensus = { version = "0.10.0-dev", git = "https://github.com/subspace/substrate", rev = "55c157cff49b638a59d81a9f971f0f9a66829c71" } diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 225c14aca3..bf0427ecd2 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -29,6 +29,7 @@ use log::{debug, info, warn}; use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; +use rayon::prelude::*; use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; use sc_utils::mpsc::tracing_unbounded; @@ -260,56 +261,6 @@ where Ok(None) } -struct BlockHashesToArchive -where - Block: BlockT, -{ - block_hashes: Vec, - best_archived: Option<(Block::Hash, NumberFor)>, -} - -fn block_hashes_to_archive( - client: &Client, - best_block_hash: Block::Hash, - blocks_to_archive_from: NumberFor, - blocks_to_archive_to: NumberFor, -) -> sp_blockchain::Result> -where - Block: BlockT, - Client: HeaderBackend, -{ - let block_range = blocks_to_archive_from..=blocks_to_archive_to; - let mut block_hashes = Vec::new(); - let mut block_hash_to_check = best_block_hash; - let mut best_archived = None; - - loop { - // TODO: `Error` here must be handled instead - let header = client - .header(block_hash_to_check)? - .expect("Parent block must exist; qed"); - - if block_range.contains(header.number()) { - block_hashes.push(block_hash_to_check); - - if best_archived.is_none() { - best_archived.replace((block_hash_to_check, *header.number())); - } - } - - if *header.number() == blocks_to_archive_from { - break; - } - - block_hash_to_check = *header.parent_hash(); - } - - Ok(BlockHashesToArchive { - block_hashes, - best_archived, - }) -} - /// Derive genesis segment on demand, returns `Ok(None)` in case genesis block was already pruned pub fn recreate_genesis_segment( client: &Client, @@ -388,7 +339,6 @@ where } fn initialize_archiver( - best_block_hash: Block::Hash, best_block_number: NumberFor, segment_headers_store: &SegmentHeadersStore, subspace_link: &SubspaceLink, @@ -479,16 +429,21 @@ where blocks_to_archive_to, ); - let block_hashes_to_archive = block_hashes_to_archive( - client, - best_block_hash, - blocks_to_archive_from.into(), + let block_hashes_to_archive = (blocks_to_archive_from..=blocks_to_archive_to) + .into_par_iter() + .map(|block_number| { + Ok(client + .hash(block_number.into())? + .expect("All blocks since last archived must be present; qed")) + }) + .collect::>>()?; + + best_archived_block.replace(( + *block_hashes_to_archive.last().expect("Not empty; qed"), blocks_to_archive_to.into(), - )?; - best_archived_block = block_hashes_to_archive.best_archived; - let block_hashes_to_archive = block_hashes_to_archive.block_hashes; + )); - for block_hash_to_archive in block_hashes_to_archive.into_iter().rev() { + for block_hash_to_archive in block_hashes_to_archive { let block = client .block(block_hash_to_archive)? .expect("Older block by number must always exist") @@ -624,7 +579,6 @@ where SO: SyncOracle + Send + Sync + 'static, { let client_info = client.info(); - let best_block_hash = client_info.best_hash; let best_block_number = client_info.best_number; let InitializedArchiver { @@ -633,7 +587,6 @@ where older_archived_segments, best_archived_block: (mut best_archived_block_hash, mut best_archived_block_number), } = initialize_archiver( - best_block_hash, best_block_number, &segment_headers_store, subspace_link, From 02a95e0ccfc9711bb8edc1915d39da469b5fd535 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 19 Sep 2023 03:18:16 +0300 Subject: [PATCH 3/3] Parallelize retrieval of blocks from database as well as object mapping creation --- crates/sc-consensus-subspace/src/archiver.rs | 78 ++++++++++++-------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index bf0427ecd2..815c0c8472 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -30,6 +30,7 @@ use parking_lot::Mutex; use rand::prelude::*; use rand_chacha::ChaCha8Rng; use rayon::prelude::*; +use rayon::ThreadPoolBuilder; use sc_client_api::{AuxStore, Backend as BackendT, BlockBackend, Finalizer, LockImportRun}; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_INFO}; use sc_utils::mpsc::tracing_unbounded; @@ -49,6 +50,9 @@ use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::objects::BlockObjectMapping; use subspace_core_primitives::{BlockNumber, RecordedHistorySegment, SegmentHeader, SegmentIndex}; +/// This corresponds to default value of `--max-runtime-instances` in Substrate +const BLOCKS_TO_ARCHIVE_CONCURRENCY: usize = 8; + #[derive(Debug)] struct SegmentHeadersStoreInner { aux_store: Arc, @@ -429,44 +433,60 @@ where blocks_to_archive_to, ); - let block_hashes_to_archive = (blocks_to_archive_from..=blocks_to_archive_to) - .into_par_iter() - .map(|block_number| { - Ok(client - .hash(block_number.into())? - .expect("All blocks since last archived must be present; qed")) - }) - .collect::>>()?; + let thread_pool = ThreadPoolBuilder::new() + .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY) + .build() + .map_err(|error| { + sp_blockchain::Error::Backend(format!( + "Failed to create thread pool for archiver initialization: {error}" + )) + })?; + // We need to limit number of threads to avoid running out of WASM instances + let blocks_to_archive = thread_pool.install(|| { + (blocks_to_archive_from..=blocks_to_archive_to) + .into_par_iter() + .map_init( + || client.runtime_api(), + |runtime_api, block_number| { + let block_hash = client + .hash(block_number.into())? + .expect("All blocks since last archived must be present; qed"); + + let block = client + .block(block_hash)? + .expect("All blocks since last archived must be present; qed") + .block; + + let block_object_mappings = runtime_api + .validated_object_call_hashes(block_hash) + .and_then(|calls| { + client.runtime_api().extract_block_object_mapping( + *block.header().parent_hash(), + block.clone(), + calls, + ) + }) + .unwrap_or_default(); + + Ok((block, block_object_mappings)) + }, + ) + .collect::>>() + })?; - best_archived_block.replace(( - *block_hashes_to_archive.last().expect("Not empty; qed"), - blocks_to_archive_to.into(), - )); + best_archived_block = blocks_to_archive + .last() + .map(|(block, _block_object_mappings)| (block.hash(), *block.header().number())); - for block_hash_to_archive in block_hashes_to_archive { - let block = client - .block(block_hash_to_archive)? - .expect("Older block by number must always exist") - .block; + for (block, block_object_mappings) in blocks_to_archive { let block_number_to_archive = *block.header().number(); - let block_object_mappings = client - .runtime_api() - .validated_object_call_hashes(block_hash_to_archive) - .and_then(|calls| { - client.runtime_api().extract_block_object_mapping( - *block.header().parent_hash(), - block.clone(), - calls, - ) - }) - .unwrap_or_default(); - let encoded_block = if block_number_to_archive.is_zero() { encode_genesis_block(&block) } else { block.encode() }; + debug!( target: "subspace", "Encoded block {} has size of {:.2} kiB",