Skip to content

Commit

Permalink
Merge pull request #2930 from subspace/avoid-unnecessary-segment-down…
Browse files Browse the repository at this point in the history
…loading

Avoid unnecessary segment downloading
  • Loading branch information
nazar-pc authored Jul 18, 2024
2 parents f16986c + 07d2219 commit 47a7f34
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 24 deletions.
5 changes: 1 addition & 4 deletions crates/subspace-service/src/sync_from_dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use sp_api::ProvideRuntimeApi;
use sp_blockchain::HeaderBackend;
use sp_consensus_subspace::{FarmerPublicKey, SubspaceApi};
use sp_runtime::traits::{Block as BlockT, CheckedSub, NumberFor};
use sp_runtime::Saturating;
use std::error::Error;
use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -282,9 +281,7 @@ where
let mut last_processed_segment_index = SegmentIndex::ZERO;
// TODO: We'll be able to just take finalized block once we are able to decouple pruning from
// finality: https://github.com/paritytech/polkadot-sdk/issues/1570
let mut last_processed_block_number = info
.best_number
.saturating_sub(chain_constants.confirmation_depth_k().into());
let mut last_processed_block_number = info.best_number;
let segment_header_downloader = SegmentHeaderDownloader::new(node);

while let Some(reason) = notifications.next().await {
Expand Down
42 changes: 22 additions & 20 deletions crates/subspace-service/src/sync_from_dsn/import_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use sc_client_api::{AuxStore, BlockBackend, HeaderBackend};
use sc_consensus::import_queue::ImportQueueService;
use sc_consensus::IncomingBlock;
use sc_consensus_subspace::archiver::{decode_block, encode_block, SegmentHeadersStore};
use sc_service::Error;
use sc_tracing::tracing::{debug, trace};
use sp_consensus::BlockOrigin;
use sp_runtime::generic::SignedBlock;
Expand Down Expand Up @@ -53,7 +54,7 @@ pub(super) async fn import_blocks_from_dsn<Block, AS, Client, PG, IQS>(
import_queue_service: &mut IQS,
last_processed_segment_index: &mut SegmentIndex,
last_processed_block_number: &mut <Block::Header as Header>::Number,
) -> Result<u64, sc_service::Error>
) -> Result<u64, Error>
where
Block: BlockT,
AS: AuxStore + Send + Sync + 'static,
Expand All @@ -63,7 +64,7 @@ where
{
{
let last_segment_header = segment_headers_store.last_segment_header().ok_or_else(|| {
sc_service::Error::Other(
Error::Other(
"Archiver needs to be initialized before syncing from DSN to populate the very \
first segment"
.to_string(),
Expand All @@ -82,7 +83,7 @@ where
}
}

let mut downloaded_blocks = 0;
let mut imported_blocks = 0;
let mut reconstructor = Reconstructor::new().map_err(|error| error.to_string())?;
// Start from the first unprocessed segment and process all segments known so far
let segment_indices_iter = (*last_processed_segment_index + SegmentIndex::ONE)
Expand All @@ -98,32 +99,33 @@ where
.get_segment_header(segment_index)
.expect("Statically guaranteed to exist, see checks above; qed");

trace!(
%segment_index,
last_archived_block_number = %segment_header.last_archived_block().number,
last_archived_block_progress = ?segment_header.last_archived_block().archived_progress,
"Checking segment header"
);

let last_archived_block =
NumberFor::<Block>::from(segment_header.last_archived_block().number);
let last_archived_block_number = segment_header.last_archived_block().number;
let last_archived_block_partial = segment_header
.last_archived_block()
.archived_progress
.partial()
.is_some();

trace!(
%segment_index,
last_archived_block_number,
last_archived_block_partial,
"Checking segment header"
);

let last_archived_block_number = NumberFor::<Block>::from(last_archived_block_number);

let info = client.info();
// We have already processed this block, it can't change
if last_archived_block <= *last_processed_block_number {
if last_archived_block_number <= *last_processed_block_number {
*last_processed_segment_index = segment_index;
// Reset reconstructor instance
reconstructor = Reconstructor::new().map_err(|error| error.to_string())?;
continue;
}
// Just one partial unprocessed block and this was the last segment available, so nothing to
// import
if last_archived_block == *last_processed_block_number + One::one()
if last_archived_block_number == *last_processed_block_number + One::one()
&& last_archived_block_partial
&& segment_indices_iter.peek().is_none()
{
Expand Down Expand Up @@ -151,7 +153,7 @@ where
.expect("Block before best block number must always be found; qed");

if encode_block(signed_block) != block_bytes {
return Err(sc_service::Error::Other(
return Err(Error::Other(
"Wrong genesis block, block import failed".to_string(),
));
}
Expand Down Expand Up @@ -181,7 +183,7 @@ where
let signed_block =
decode_block::<Block>(&block_bytes).map_err(|error| error.to_string())?;

*last_processed_block_number = last_archived_block;
*last_processed_block_number = last_archived_block_number;

// No need to import blocks that are already present, if block is not present it might
// correspond to a short fork, so we need to import it even if we already have another
Expand Down Expand Up @@ -210,9 +212,9 @@ where
skip_execution: false,
});

downloaded_blocks += 1;
imported_blocks += 1;

if downloaded_blocks % 1000 == 0 {
if imported_blocks % 1000 == 0 {
debug!("Adding block {} from DSN to the import queue", block_number);
}
}
Expand All @@ -237,14 +239,14 @@ where
*last_processed_segment_index = segment_index;
}

Ok(downloaded_blocks)
Ok(imported_blocks)
}

pub(super) async fn download_and_reconstruct_blocks<PG>(
segment_index: SegmentIndex,
piece_getter: &PG,
reconstructor: &mut Reconstructor,
) -> Result<Vec<(BlockNumber, Vec<u8>)>, sc_service::Error>
) -> Result<Vec<(BlockNumber, Vec<u8>)>, Error>
where
PG: DsnSyncPieceGetter,
{
Expand Down

0 comments on commit 47a7f34

Please sign in to comment.