Skip to content

Commit

Permalink
Refactor snap-sync. (#2916)
Browse files Browse the repository at this point in the history
* Refactor snap-sync.

- add optional target block
- add conditional block import

* Update crates/subspace-service/src/sync_from_dsn/snap_sync.rs

Co-authored-by: Nazar Mokrynskyi <[email protected]>

* Change condition for target_block (snap-sync).

* Refactor snap-sync.

* Remove import_state_block_only variable

---------

Co-authored-by: Nazar Mokrynskyi <[email protected]>
  • Loading branch information
shamil-gadelshin and nazar-pc authored Jul 23, 2024
1 parent 18dd43a commit fb2311b
Showing 1 changed file with 117 additions and 38 deletions.
155 changes: 117 additions & 38 deletions crates/subspace-service/src/sync_from_dsn/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use subspace_archiving::reconstructor::Reconstructor;
use subspace_core_primitives::SegmentIndex;
use subspace_core_primitives::{BlockNumber, SegmentIndex};
use subspace_networking::Node;
use tokio::time::sleep;
use tracing::{debug, error};
Expand Down Expand Up @@ -69,6 +69,7 @@ pub(crate) async fn snap_sync<Backend, Block, AS, Client, PG, NR>(
import_queue_service.as_mut(),
&network_request,
&sync_service,
None,
);

match snap_sync_fut.await {
Expand All @@ -95,55 +96,83 @@ pub(crate) async fn snap_sync<Backend, Block, AS, Client, PG, NR>(
.await;
}

#[allow(clippy::too_many_arguments)]
async fn sync<PG, AS, Block, Client, IQS, B, NR>(
// Get blocks from the last segment or from the segment containing the target block.
// Returns encoded blocks collection and used segment index.
pub(crate) async fn get_blocks_from_target_segment<AS, PG>(
segment_headers_store: &SegmentHeadersStore<AS>,
node: &Node,
piece_getter: &PG,
fork_id: Option<&str>,
client: &Arc<Client>,
import_queue_service: &mut IQS,
network_request: &NR,
sync_service: &SyncingService<Block>,
) -> Result<(), Error>
target_block: Option<BlockNumber>,
) -> Result<Option<(SegmentIndex, VecDeque<(BlockNumber, Vec<u8>)>)>, Error>
where
B: sc_client_api::Backend<Block>,
PG: DsnSyncPieceGetter,
AS: AuxStore,
Block: BlockT,
Client: HeaderBackend<Block>
+ ClientExt<Block, B>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ LockImportRun<Block, B>
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
IQS: ImportQueueService<Block> + ?Sized,
NR: NetworkRequest,
PG: DsnSyncPieceGetter,
{
debug!("Starting snap sync...");

sync_segment_headers(segment_headers_store, node)
.await
.map_err(|error| format!("Failed to sync segment headers: {}", error))?;

let last_segment_index = segment_headers_store
.max_segment_index()
.expect("Successfully synced above; qed");
let target_segment_index = {
let last_segment_index = segment_headers_store
.max_segment_index()
.expect("Successfully synced above; qed");

if let Some(target_block) = target_block {
let mut segment_header = segment_headers_store
.get_segment_header(last_segment_index)
.ok_or(format!(
"Can't get segment header from the store: {last_segment_index}"
))?;

if target_block > segment_header.last_archived_block().number {
return Err(format!(
"Target block is greater than the last archived block. \
Last segment index = {last_segment_index}, target block = {target_block}, \
last block from the segment = {}
",
segment_header.last_archived_block().number
)
.into());
}

let mut current_segment_index = last_segment_index;

loop {
if current_segment_index <= SegmentIndex::ONE {
break;
}

if target_block > segment_header.last_archived_block().number {
current_segment_index += SegmentIndex::ONE;
break;
}

current_segment_index -= SegmentIndex::ONE;

segment_header = segment_headers_store
.get_segment_header(current_segment_index)
.ok_or(format!(
"Can't get segment header from the store: {last_segment_index}"
))?;
}

current_segment_index
} else {
last_segment_index
}
};

// Skip the snap sync if there is just one segment header built on top of genesis, it is
// more efficient to sync it regularly
if last_segment_index <= SegmentIndex::ONE {
if target_segment_index <= SegmentIndex::ONE {
debug!("Snap sync was skipped due to too early chain history");

return Ok(());
return Ok(None);
}

// Identify all segment headers that would need to be reconstructed in order to get first
// block of last segment header
let mut segments_to_reconstruct = VecDeque::from([last_segment_index]);
let mut segments_to_reconstruct = VecDeque::from([target_segment_index]);
{
let mut last_segment_first_block_number = None;

Expand Down Expand Up @@ -204,6 +233,56 @@ where
blocks = VecDeque::from(blocks_fut.await?);
}
}

Ok(Some((target_segment_index, blocks)))
}

#[allow(clippy::too_many_arguments)]
/// Synchronize the blockchain to the target_block (approximate value based on the containing
/// segment) or to the last archived block.
async fn sync<PG, AS, Block, Client, IQS, B, NR>(
segment_headers_store: &SegmentHeadersStore<AS>,
node: &Node,
piece_getter: &PG,
fork_id: Option<&str>,
client: &Arc<Client>,
import_queue_service: &mut IQS,
network_request: &NR,
sync_service: &SyncingService<Block>,
target_block: Option<BlockNumber>,
) -> Result<(), Error>
where
B: sc_client_api::Backend<Block>,
PG: DsnSyncPieceGetter,
AS: AuxStore,
Block: BlockT,
Client: HeaderBackend<Block>
+ ClientExt<Block, B>
+ ProvideRuntimeApi<Block>
+ ProofProvider<Block>
+ LockImportRun<Block, B>
+ Send
+ Sync
+ 'static,
Client::Api: SubspaceApi<Block, FarmerPublicKey> + ObjectsApi<Block>,
IQS: ImportQueueService<Block> + ?Sized,
NR: NetworkRequest,
{
debug!("Starting snap sync...");

let Some((target_segment_index, mut blocks)) =
get_blocks_from_target_segment(segment_headers_store, node, piece_getter, target_block)
.await?
else {
// Snap-sync skipped
return Ok(());
};

debug!(
"Segments data received. Target segment index: {:?}",
target_segment_index
);

let mut blocks_to_import = Vec::with_capacity(blocks.len());
let last_block_number;

Expand All @@ -221,10 +300,10 @@ where
});

debug!(
%last_segment_index,
%target_segment_index,
%first_block_number,
%last_block_number,
"Blocks from last segment downloaded"
"Blocks from target segment downloaded"
);

let signed_block = decode_block::<Block>(&first_block_bytes)
Expand All @@ -236,10 +315,10 @@ where
let state = download_state(&header, client, fork_id, network_request, sync_service)
.await
.map_err(|error| {
format!("Failed to download state for the first block of last segment: {error}")
format!("Failed to download state for the first block of target segment: {error}")
})?;

debug!("Downloaded state of the first block of the last segment");
debug!("Downloaded state of the first block of the target segment");

blocks_to_import.push(IncomingBlock {
hash: header.hash(),
Expand All @@ -257,7 +336,7 @@ where

debug!(
blocks_count = %blocks.len(),
"Queuing importing remaining blocks from last segment"
"Queuing importing remaining blocks from target segment"
);

for (_block_number, block_bytes) in blocks {
Expand Down Expand Up @@ -289,8 +368,8 @@ where
if let Some(last_block_to_import) = maybe_last_block_to_import {
debug!(
%last_block_number,
%last_segment_index,
"Importing the last block from the last segment"
%target_segment_index,
"Importing the last block from the target segment"
);

import_queue_service
Expand Down

0 comments on commit fb2311b

Please sign in to comment.