Skip to content

Commit

Permalink
Merge pull request #3071 from autonomys/reconstruct-blocking
Browse files Browse the repository at this point in the history
Spawn snap sync segment reconstruction into a blocking task
  • Loading branch information
teor2345 authored Sep 30, 2024
2 parents 2149a47 + 6c9b84d commit 249c97d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 13 deletions.
28 changes: 18 additions & 10 deletions crates/subspace-service/src/sync_from_dsn/import_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use sp_consensus::BlockOrigin;
use sp_runtime::generic::SignedBlock;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor, One};
use sp_runtime::Saturating;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use subspace_archiving::reconstructor::Reconstructor;
use subspace_core_primitives::{
Expand All @@ -36,6 +37,7 @@ use subspace_core_primitives::{
use subspace_erasure_coding::ErasureCoding;
use subspace_networking::utils::multihash::ToMultihash;
use tokio::sync::Semaphore;
use tokio::task::spawn_blocking;
use tracing::warn;

/// How many blocks to queue before pausing and waiting for blocks to be imported, this is
Expand Down Expand Up @@ -87,7 +89,7 @@ where
}

let mut imported_blocks = 0;
let mut reconstructor = Reconstructor::new(erasure_coding.clone());
let mut reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
// Start from the first unprocessed segment and process all segments known so far
let segment_indices_iter = (*last_processed_segment_index + SegmentIndex::ONE)
..=segment_headers_store
Expand Down Expand Up @@ -123,7 +125,7 @@ where
if last_archived_block_number <= *last_processed_block_number {
*last_processed_segment_index = segment_index;
// Reset reconstructor instance
reconstructor = Reconstructor::new(erasure_coding.clone());
reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
continue;
}
// Just one partial unprocessed block and this was the last segment available, so nothing to
Expand All @@ -133,13 +135,12 @@ where
&& segment_indices_iter.peek().is_none()
{
// Reset reconstructor instance
reconstructor = Reconstructor::new(erasure_coding.clone());
reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));
continue;
}

let blocks =
download_and_reconstruct_blocks(segment_index, piece_getter, &mut reconstructor)
.await?;
download_and_reconstruct_blocks(segment_index, piece_getter, &reconstructor).await?;

let mut blocks_to_import = Vec::with_capacity(QUEUED_BLOCKS_LIMIT as usize);

Expand Down Expand Up @@ -236,7 +237,7 @@ where
pub(super) async fn download_and_reconstruct_blocks<PG>(
segment_index: SegmentIndex,
piece_getter: &PG,
reconstructor: &mut Reconstructor,
reconstructor: &Arc<Mutex<Reconstructor>>,
) -> Result<Vec<(BlockNumber, Vec<u8>)>, Error>
where
PG: DsnSyncPieceGetter,
Expand Down Expand Up @@ -321,10 +322,17 @@ where
}
}

let reconstructed_contents = reconstructor
.add_segment(segment_pieces.as_ref())
.map_err(|error| error.to_string())?;
drop(segment_pieces);
// CPU-intensive piece and segment reconstruction code can block the async executor.
let reconstructor = reconstructor.clone();
let reconstructed_contents = spawn_blocking(move || {
reconstructor
.lock()
.expect("Panic if previous thread panicked when holding the mutex")
.add_segment(segment_pieces.as_ref())
})
.await
.expect("Panic if blocking task panicked")
.map_err(|error| error.to_string())?;

trace!(%segment_index, "Segment reconstructed successfully");

Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-service/src/sync_from_dsn/snap_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use sp_objects::ObjectsApi;
use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
use std::collections::{HashSet, VecDeque};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use subspace_archiving::reconstructor::Reconstructor;
use subspace_core_primitives::{BlockNumber, PublicKey, SegmentIndex};
Expand Down Expand Up @@ -229,11 +229,11 @@ where
// Reconstruct blocks of the last segment
let mut blocks = VecDeque::new();
{
let mut reconstructor = Reconstructor::new(erasure_coding.clone());
let reconstructor = Arc::new(Mutex::new(Reconstructor::new(erasure_coding.clone())));

for segment_index in segments_to_reconstruct {
let blocks_fut =
download_and_reconstruct_blocks(segment_index, piece_getter, &mut reconstructor);
download_and_reconstruct_blocks(segment_index, piece_getter, &reconstructor);

blocks = VecDeque::from(blocks_fut.await?);
}
Expand Down

0 comments on commit 249c97d

Please sign in to comment.