Skip to content

Commit

Permalink
Merge pull request #2368 from subspace/gemini-3g-backport-piece-cache…
Browse files Browse the repository at this point in the history
…-acknowledge-segment-headers-faster

Gemini 3g backport: Acknowledge segment headers in piece cache much faster
  • Loading branch information
nazar-pc authored Dec 22, 2023
2 parents 1b8b5f3 + 2c3f26c commit d504fed
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 43 deletions.
99 changes: 69 additions & 30 deletions crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,49 +432,90 @@ where
debug!(%segment_index, "Starting to process newly archived segment");

if worker_state.last_segment_index < segment_index {
// TODO: Can probably do concurrency here
for piece_index in segment_index.segment_piece_indexes() {
if !worker_state
.heap
.should_include_key(KeyWrapper(piece_index))
{
trace!(%piece_index, "Piece doesn't need to be cached #1");
debug!(%segment_index, "Downloading potentially useful pieces");

// We do not insert pieces into cache/heap yet, so we don't know if all of these pieces
// will be included, but there is a good chance they will be and we want to acknowledge
// new segment header as soon as possible
let pieces_to_maybe_include = segment_index
.segment_piece_indexes()
.into_iter()
.filter(|&piece_index| {
let maybe_include = worker_state
.heap
.should_include_key(KeyWrapper(piece_index));
if !maybe_include {
trace!(%piece_index, "Piece doesn't need to be cached #1");
}

continue;
}
maybe_include
})
.map(|piece_index| async move {
let maybe_piece = match self.node_client.piece(piece_index).await {
Ok(maybe_piece) => maybe_piece,
Err(error) => {
error!(
%error,
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, this \
should never happen and is an implementation bug"
);

trace!(%piece_index, "Piece needs to be cached #1");
return None;
}
};

let maybe_piece = match self.node_client.piece(piece_index).await {
Ok(maybe_piece) => maybe_piece,
Err(error) => {
let Some(piece) = maybe_piece else {
error!(
%error,
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, this \
should never happen and is an implementation bug"
"Failed to retrieve piece from node right after archiving, this should \
never happen and is an implementation bug"
);
continue;
}
};

let Some(piece) = maybe_piece else {
error!(
%segment_index,
%piece_index,
"Failed to retrieve piece from node right after archiving, this should \
never happen and is an implementation bug"
);
return None;
};

Some((piece_index, piece))
})
.collect::<FuturesUnordered<_>>()
.filter_map(|maybe_piece| async move { maybe_piece })
.collect::<Vec<_>>()
.await;

debug!(%segment_index, "Downloaded potentially useful pieces");

self.acknowledge_archived_segment_processing(segment_index)
.await;

// Go through potentially matching pieces again now that segment was acknowledged and
// try to persist them if necessary
for (piece_index, piece) in pieces_to_maybe_include {
if !worker_state
.heap
.should_include_key(KeyWrapper(piece_index))
{
trace!(%piece_index, "Piece doesn't need to be cached #2");

continue;
};
}

trace!(%piece_index, "Piece needs to be cached #1");

self.persist_piece_in_cache(piece_index, piece, worker_state);
}

worker_state.last_segment_index = segment_index;
} else {
self.acknowledge_archived_segment_processing(segment_index)
.await;
}

debug!(%segment_index, "Finished processing newly archived segment");
}

async fn acknowledge_archived_segment_processing(&self, segment_index: SegmentIndex) {
match self
.node_client
.acknowledge_archived_segment_header(segment_index)
Expand All @@ -487,8 +528,6 @@ where
error!(%segment_index, ?error, "Failed to acknowledge archived segment");
}
};

debug!(%segment_index, "Finished processing newly archived segment");
}

async fn keep_up_after_initial_sync<PG>(
Expand Down Expand Up @@ -527,7 +566,7 @@ where
for piece_index in piece_indices {
let key = KeyWrapper(piece_index);
if !worker_state.heap.should_include_key(key) {
trace!(%piece_index, "Piece doesn't need to be cached #2");
trace!(%piece_index, "Piece doesn't need to be cached #3");

continue;
}
Expand Down
28 changes: 16 additions & 12 deletions crates/subspace-farmer/src/piece_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,19 +312,23 @@ async fn basic() {
},
};

archived_segment_headers_sender
.send(segment_header)
.await
.unwrap();

// Wait for acknowledgement
assert_eq!(
acknowledge_archived_segment_header_receiver
.next()
// Send twice because acknowledgement arrives early, sending twice doesn't have side effects, but ensures
// things were processed fully
for _ in 0..=1 {
archived_segment_headers_sender
.send(segment_header)
.await
.unwrap(),
SegmentIndex::from(segment_index)
);
.unwrap();

// Wait for acknowledgement
assert_eq!(
acknowledge_archived_segment_header_receiver
.next()
.await
.unwrap(),
SegmentIndex::from(segment_index)
);
}

current_segment_index.store(segment_index, Ordering::Release);
}
Expand Down
5 changes: 4 additions & 1 deletion crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::{fs, io, mem};
use subspace_core_primitives::crypto::blake3_hash;
use subspace_core_primitives::crypto::kzg::Kzg;
Expand All @@ -63,12 +64,13 @@ use ulid::Ulid;

// Refuse to compile on non-64-bit platforms, offsets may fail on those when converting from u64 to
// usize depending on chain parameters
const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());
const_assert!(mem::size_of::<usize>() >= mem::size_of::<u64>());

/// Reserve 1M of space for plot metadata (for potential future expansion)
const RESERVED_PLOT_METADATA: u64 = 1024 * 1024;
/// Reserve 1M of space for farm info (for potential future expansion)
const RESERVED_FARM_INFO: u64 = 1024 * 1024;
const NEW_SEGMENT_PROCESSING_DELAY: Duration = Duration::from_secs(30);

/// An identifier for single disk farm, can be used for in logs, thread names, etc.
#[derive(
Expand Down Expand Up @@ -1036,6 +1038,7 @@ impl SingleDiskFarm {
sectors_metadata: Arc::clone(&sectors_metadata),
sectors_to_plot_sender,
initial_plotting_finished: farming_delay_sender,
new_segment_processing_delay: NEW_SEGMENT_PROCESSING_DELAY,
};
tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));

Expand Down
10 changes: 10 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,9 @@ pub(super) struct PlottingSchedulerOptions<NC> {
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
pub(super) initial_plotting_finished: Option<oneshot::Sender<()>>,
// Delay between segment header being acknowledged by farmer and potentially triggering
// replotting
pub(super) new_segment_processing_delay: Duration,
}

pub(super) async fn plotting_scheduler<NC>(
Expand All @@ -431,6 +434,7 @@ where
sectors_metadata,
sectors_to_plot_sender,
initial_plotting_finished,
new_segment_processing_delay,
} = plotting_scheduler_options;

// Create a proxy channel with atomically updatable last archived segment that
Expand All @@ -457,6 +461,7 @@ where
&node_client,
&last_archived_segment,
archived_segments_sender,
new_segment_processing_delay,
);

let (sectors_to_plot_proxy_sender, sectors_to_plot_proxy_receiver) = mpsc::channel(0);
Expand Down Expand Up @@ -497,6 +502,7 @@ async fn read_archived_segments_notifications<NC>(
node_client: &NC,
last_archived_segment: &Atomic<SegmentHeader>,
mut archived_segments_sender: mpsc::Sender<()>,
new_segment_processing_delay: Duration,
) -> Result<(), BackgroundTaskError>
where
NC: NodeClient,
Expand All @@ -517,6 +523,10 @@ where
debug!(%error, "Failed to acknowledge segment header");
}

// There is no urgent need to rush replotting sectors immediately and this delay allows for
// newly archived pieces to be both cached locally and on other farmers on the network
tokio::time::sleep(new_segment_processing_delay).await;

last_archived_segment.store(segment_header, Ordering::SeqCst);
// Just a notification such that receiving side can read updated
// `last_archived_segment` (whatever it happens to be right now)
Expand Down

0 comments on commit d504fed

Please sign in to comment.