From 4003ea7303be4787315cf2b3af8f30a4e36570b0 Mon Sep 17 00:00:00 2001 From: teor Date: Wed, 30 Oct 2024 16:40:56 +1000 Subject: [PATCH] Accept a start piece index for --create-object-mappings --- crates/sc-consensus-subspace/src/archiver.rs | 259 +++++++++++++----- .../src/bin/subspace-malicious-operator.rs | 3 +- .../src/commands/run/consensus.rs | 13 +- crates/subspace-service/src/config.rs | 5 +- 4 files changed, 207 insertions(+), 73 deletions(-) diff --git a/crates/sc-consensus-subspace/src/archiver.rs b/crates/sc-consensus-subspace/src/archiver.rs index 9749e01323..740b846c3b 100644 --- a/crates/sc-consensus-subspace/src/archiver.rs +++ b/crates/sc-consensus-subspace/src/archiver.rs @@ -71,12 +71,14 @@ use sp_runtime::traits::{Block as BlockT, CheckedSub, Header, NumberFor, One, Ze use sp_runtime::Justifications; use std::error::Error; use std::future::Future; -use std::slice; +use std::str::FromStr; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::Arc; use std::time::Duration; +use std::{fmt, slice}; use subspace_archiving::archiver::{Archiver, NewArchivedSegment}; use subspace_core_primitives::objects::{BlockObjectMapping, GlobalObject}; +use subspace_core_primitives::pieces::PieceIndex; use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentHeader, SegmentIndex}; use subspace_core_primitives::{BlockNumber, PublicKey}; use subspace_erasure_coding::ErasureCoding; @@ -356,12 +358,97 @@ pub struct ObjectMappingNotification { // TODO: add an acknowledgement_sender for backpressure if needed } +/// When to start creating object mappings. +/// +/// There is no setting for starting from the last archived block, because we can't guarantee that +/// all the mappings from that block were received by clients before the node shut down. Instead, +/// clients should tell the node to create mappings from the latest piece index they know about. +#[derive(Copy, Clone, Debug, Default, Eq, PartialEq)] +pub enum CreateObjectMappings { + /// Start creating object mappings from this segment index. + Segment(SegmentIndex), + + /// Don't create object mappings. + #[default] + Disabled, +} + +impl FromStr for CreateObjectMappings { + type Err = String; + + fn from_str(input: &str) -> Result { + match input { + "disabled" => Ok(Self::Disabled), + piece_index => piece_index + .parse::() + .map(|index| Self::from_piece_index(index.into())) + .map_err(|_| { + "Unsupported create object mappings setting: \ + use a piece index number, or 'disabled'" + .to_string() + }), + } + } +} + +impl fmt::Display for CreateObjectMappings { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Segment(segment_index) => write!(f, "{}", segment_index), + Self::Disabled => f.write_str("disabled"), + } + } +} + +impl CreateObjectMappings { + /// Create object mappings from the start of the chain. + pub fn from_genesis() -> Self { + Self::from_piece_index(PieceIndex::ZERO) + } + + /// Create object mappings from the segment containing the given piece index. + /// All mappings in the segment will be created, including those before this piece. + pub fn from_piece_index(piece_index: PieceIndex) -> Self { + Self::Segment(piece_index.segment_index()) + } + + /// Don't create object mappings. + pub fn disabled() -> Self { + Self::Disabled + } + + /// The segment index to start creating object mappings from. + pub fn segment_index(&self) -> Option { + match self { + CreateObjectMappings::Segment(segment_index) => Some(*segment_index), + CreateObjectMappings::Disabled => None, + } + } + + /// Does the supplied segment index need object mappings? + pub fn for_segment(&self, segment_index: SegmentIndex) -> bool { + let Some(target_index) = self.segment_index() else { + return false; + }; + + segment_index >= target_index + } +} + +#[expect(clippy::type_complexity)] fn find_last_archived_block( client: &Client, segment_headers_store: &SegmentHeadersStore, best_block_to_archive: NumberFor, - create_object_mappings: bool, -) -> sp_blockchain::Result, BlockObjectMapping)>> + create_object_mappings: CreateObjectMappings, +) -> sp_blockchain::Result< + Option<( + SegmentHeader, + SegmentIndex, + SignedBlock, + BlockObjectMapping, + )>, +> where Block: BlockT, Client: ProvideRuntimeApi + BlockBackend + HeaderBackend, @@ -377,15 +464,21 @@ where return Ok(None); } - for segment_header in (SegmentIndex::ZERO..=max_segment_index) + for (segment_header, segment_index) in (SegmentIndex::ZERO..=max_segment_index) .rev() - .filter_map(|segment_index| segment_headers_store.get_segment_header(segment_index)) + .filter_map(|segment_index| { + segment_headers_store + .get_segment_header(segment_index) + .map(|header| (header, segment_index)) + }) { // If we're re-creating mappings for existing segments, ignore those segments. This // archives them again, and creates their mappings. // TODO: create historic mappings without doing expensive re-archiving operations - if create_object_mappings { - continue; + if let Some(target_segment_index) = create_object_mappings.segment_index() { + if segment_index >= target_segment_index { + continue; + } } let last_archived_block_number = segment_header.last_archived_block().number; @@ -401,13 +494,11 @@ where continue; }; - let last_segment_header = segment_header; - let last_archived_block = client .block(last_archived_block_hash)? .expect("Last archived block must always be retrievable; qed"); - let block_object_mappings = if create_object_mappings { + let block_object_mappings = if create_object_mappings.for_segment(segment_index) { client .runtime_api() .extract_block_object_mapping( @@ -420,7 +511,8 @@ where }; return Ok(Some(( - last_segment_header, + segment_header, + segment_index, last_archived_block, block_object_mappings, ))); @@ -544,7 +636,7 @@ fn initialize_archiver( segment_headers_store: &SegmentHeadersStore, subspace_link: &SubspaceLink, client: &Client, - create_object_mappings: bool, + create_object_mappings: CreateObjectMappings, ) -> sp_blockchain::Result> where Block: BlockT, @@ -579,45 +671,51 @@ where let have_last_segment_header = maybe_last_archived_block.is_some(); let mut best_archived_block = None; + let mut first_new_segment_index = SegmentIndex::ZERO; - let mut archiver = - if let Some((last_segment_header, last_archived_block, block_object_mappings)) = - maybe_last_archived_block - { - // Continuing from existing initial state - let last_archived_block_number = last_segment_header.last_archived_block().number; - info!( - %last_archived_block_number, - "Resuming archiver from last archived block", - ); + let mut archiver = if let Some(( + last_segment_header, + last_segment_index, + last_archived_block, + block_object_mappings, + )) = maybe_last_archived_block + { + first_new_segment_index = last_segment_index + SegmentIndex::ONE; - // Set initial value, this is needed in case only genesis block was archived and there - // is nothing else available - best_archived_block.replace(( - last_archived_block.block.hash(), - *last_archived_block.block.header().number(), - )); - - let last_archived_block_encoded = encode_block(last_archived_block); - - let archiver = Archiver::with_initial_state( - subspace_link.kzg().clone(), - subspace_link.erasure_coding().clone(), - last_segment_header, - &last_archived_block_encoded, - block_object_mappings, - ) - .expect("Incorrect parameters for archiver"); + // Continuing from existing initial state + let last_archived_block_number = last_segment_header.last_archived_block().number; + info!( + %last_archived_block_number, + "Resuming archiver from last archived block", + ); - archiver - } else { - info!("Starting archiving from genesis"); + // Set initial value, this is needed in case only genesis block was archived and there + // is nothing else available + best_archived_block.replace(( + last_archived_block.block.hash(), + *last_archived_block.block.header().number(), + )); - Archiver::new( - subspace_link.kzg().clone(), - subspace_link.erasure_coding().clone(), - ) - }; + let last_archived_block_encoded = encode_block(last_archived_block); + + let archiver = Archiver::with_initial_state( + subspace_link.kzg().clone(), + subspace_link.erasure_coding().clone(), + last_segment_header, + &last_archived_block_encoded, + block_object_mappings, + ) + .expect("Incorrect parameters for archiver"); + + archiver + } else { + info!("Starting archiving from genesis"); + + Archiver::new( + subspace_link.kzg().clone(), + subspace_link.erasure_coding().clone(), + ) + }; // Process blocks since last fully archived block up to the current head minus K { @@ -643,6 +741,12 @@ where blocks_to_archive_from, blocks_to_archive_to, ); + // If we're sure we'll need mappings for the first new segment, and all the segments + // after it, get them in parallel now. (Missed mappings will be picked up in the + // archiver loop below.) + let create_all_object_mappings = + create_object_mappings.for_segment(first_new_segment_index); + let thread_pool = ThreadPoolBuilder::new() .num_threads(BLOCKS_TO_ARCHIVE_CONCURRENCY) .build() @@ -666,7 +770,7 @@ where .block(block_hash)? .expect("All blocks since last archived must be present; qed"); - let block_object_mappings = if create_object_mappings { + let block_object_mappings = if create_all_object_mappings { runtime_api .extract_block_object_mapping( *block.block.header().parent_hash(), @@ -690,9 +794,27 @@ where (block.block.hash(), *block.block.header().number()) }); - for (signed_block, block_object_mappings) in blocks_to_archive { - let block_number_to_archive = *signed_block.block.header().number(); + let mut new_segment_index = first_new_segment_index; + for (signed_block, mut block_object_mappings) in blocks_to_archive { + // If we skipped mappings for one or more segments, then started creating mappings + // in the middle of blocks_to_archive, pick up the missing mappings here. + let want_block_object_mappings = + create_object_mappings.for_segment(new_segment_index); + if want_block_object_mappings != create_all_object_mappings { + block_object_mappings = if want_block_object_mappings { + client + .runtime_api() + .extract_block_object_mapping( + *signed_block.block.header().parent_hash(), + signed_block.block.clone(), + ) + .unwrap_or_default() + } else { + BlockObjectMapping::default() + }; + } + let block_number_to_archive = *signed_block.block.header().number(); let encoded_block = encode_block(signed_block); debug!( @@ -715,6 +837,9 @@ where if !new_segment_headers.is_empty() { segment_headers_store.add_segment_headers(&new_segment_headers)?; } + if let Some(last_segment_header) = new_segment_headers.last() { + new_segment_index = last_segment_header.segment_index() + SegmentIndex::ONE; + } } } } @@ -787,8 +912,9 @@ fn finalize_block( /// processing, which is necessary for ensuring that when the next block is imported, inherents will /// contain segment header of newly archived block (must happen exactly in the next block). /// -/// If `create_object_mappings` is set, when a block with object mappings is archived, notification -/// ([`SubspaceLink::object_mapping_notification_stream`]) will be sent. +/// `create_object_mappings` controls when object mappings are created for archived blocks. When +/// these mappings are created, a ([`SubspaceLink::object_mapping_notification_stream`]) +/// notification will be sent. /// /// Once segment header is archived, notification ([`SubspaceLink::archived_segment_notification_stream`]) /// will be sent and archiver will be paused until all receivers have provided an acknowledgement @@ -803,7 +929,7 @@ pub fn create_subspace_archiver( client: Arc, sync_oracle: SubspaceSyncOracle, telemetry: Option, - create_object_mappings: bool, + create_object_mappings: CreateObjectMappings, ) -> sp_blockchain::Result> + Send + 'static> where Block: BlockT, @@ -868,13 +994,13 @@ where } }; - let last_archived_block_number = NumberFor::::from( - segment_headers_store - .last_segment_header() - .expect("Exists after archiver initialization; qed") - .last_archived_block() - .number, - ); + let last_archived_segment_header = segment_headers_store + .last_segment_header() + .expect("Exists after archiver initialization; qed"); + let mut next_segment_index = + last_archived_segment_header.segment_index() + SegmentIndex::ONE; + let last_archived_block_number = + NumberFor::::from(last_archived_segment_header.last_archived_block().number); trace!( %importing_block_number, %block_number_to_archive, @@ -884,8 +1010,8 @@ where ); // Skip archived blocks, unless we're producing object mappings for the full history - let skip_last_archived_blocks = - last_archived_block_number > block_number_to_archive && !create_object_mappings; + let skip_last_archived_blocks = last_archived_block_number > block_number_to_archive + && !create_object_mappings.for_segment(next_segment_index); if best_archived_block_number >= block_number_to_archive || skip_last_archived_blocks { // This block was already archived, skip debug!( @@ -914,7 +1040,12 @@ where )?; if best_archived_block_number + One::one() == block_number_to_archive { - // As expected, can continue now + // As expected, can archive this block + next_segment_index = segment_headers_store + .last_segment_header() + .expect("Exists after archiver initialization; qed") + .segment_index() + + SegmentIndex::ONE; } else if best_archived_block_number >= block_number_to_archive { // Special sync mode where verified blocks were inserted into blockchain // directly, archiving of this block will naturally happen later @@ -952,7 +1083,7 @@ where subspace_link.archived_segment_notification_sender.clone(), best_archived_block_hash, block_number_to_archive, - create_object_mappings, + create_object_mappings.for_segment(next_segment_index), ) .await?; } diff --git a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs index 101067b5fe..31e2863821 100644 --- a/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs +++ b/crates/subspace-malicious-operator/src/bin/subspace-malicious-operator.rs @@ -23,6 +23,7 @@ use domain_client_operator::fetch_domain_bootstrap_info; use domain_runtime_primitives::opaque::Block as DomainBlock; use sc_cli::{ChainSpec, SubstrateCli}; use sc_consensus_slots::SlotProportion; +use sc_consensus_subspace::archiver::CreateObjectMappings; use sc_network::config::MultiaddrWithPeerId; use sc_transaction_pool_api::OffchainTransactionPoolFactory; use sc_utils::mpsc::tracing_unbounded; @@ -199,7 +200,7 @@ fn main() -> Result<(), Error> { base: consensus_chain_config, // Domain node needs slots notifications for bundle production. force_new_slot_notifications: true, - create_object_mappings: true, + create_object_mappings: CreateObjectMappings::from_genesis(), subspace_networking: SubspaceNetworking::Create { config: dsn_config }, dsn_piece_getter: None, sync: Default::default(), diff --git a/crates/subspace-node/src/commands/run/consensus.rs b/crates/subspace-node/src/commands/run/consensus.rs index bbe1722d06..401dffa5e6 100644 --- a/crates/subspace-node/src/commands/run/consensus.rs +++ b/crates/subspace-node/src/commands/run/consensus.rs @@ -7,6 +7,7 @@ use sc_cli::{ generate_node_name, Cors, NodeKeyParams, NodeKeyType, RpcMethods, TelemetryParams, TransactionPoolParams, RPC_DEFAULT_PORT, }; +use sc_consensus_subspace::archiver::CreateObjectMappings; use sc_network::config::{MultiaddrWithPeerId, NonReservedPeerMode, Role, SetConfig}; use sc_service::{BlocksPruning, Configuration, PruningMode}; use sc_storage_monitor::StorageMonitorParams; @@ -389,12 +390,12 @@ pub(super) struct ConsensusChainOptions { #[arg(long)] force_authoring: bool, - /// Create object mappings for new blocks, and blocks that have already been archived. - /// By default, mappings are not created for any blocks. + /// Create object mappings from the supplied piece index, or genesis if no piece index is + /// specified. By default, mappings are disabled. /// - /// --dev mode enables this option automatically. - #[arg(long)] - create_object_mappings: bool, + /// --dev mode enables mappings from genesis automatically. + #[arg(long, default_value_t, default_missing_value("0"))] + create_object_mappings: CreateObjectMappings, /// External entropy, used initially when PoT chain starts to derive the first seed #[arg(long)] @@ -474,7 +475,7 @@ pub(super) fn create_consensus_chain_configuration( tmp = true; force_synced = true; force_authoring = true; - create_object_mappings = true; + create_object_mappings = CreateObjectMappings::from_genesis(); network_options.allow_private_ips = true; timekeeper_options.timekeeper = true; diff --git a/crates/subspace-service/src/config.rs b/crates/subspace-service/src/config.rs index 80fe6bd3e7..6f6e895f3f 100644 --- a/crates/subspace-service/src/config.rs +++ b/crates/subspace-service/src/config.rs @@ -1,6 +1,7 @@ use crate::dsn::DsnConfig; use crate::sync_from_dsn::DsnSyncPieceGetter; use sc_chain_spec::ChainSpec; +use sc_consensus_subspace::archiver::CreateObjectMappings; use sc_network::config::{ MultiaddrWithPeerId, NetworkBackendType, NetworkConfiguration, NodeKeyConfig, SetConfig, SyncMode, TransportConfig, DEFAULT_KADEMLIA_REPLICATION_FACTOR, @@ -302,8 +303,8 @@ pub struct SubspaceConfiguration { /// Whether slot notifications need to be present even if node is not responsible for block /// authoring. pub force_new_slot_notifications: bool, - /// Create object mappings for new blocks, and blocks that have already been archived. - pub create_object_mappings: bool, + /// Create object mappings from a specified segment index, or disable object mapping creation. + pub create_object_mappings: CreateObjectMappings, /// Subspace networking (DSN). pub subspace_networking: SubspaceNetworking, /// DSN piece getter