Skip to content

Commit

Permalink
Merge pull request #2565 from subspace/optimized-farmer-piece-getter
Browse files Browse the repository at this point in the history
Optimized farmer piece getter
  • Loading branch information
nazar-pc authored Feb 29, 2024
2 parents 4b690d9 + 639fc76 commit 3765fa4
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 42 deletions.
43 changes: 15 additions & 28 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use anyhow::anyhow;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
use futures::channel::oneshot;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, StreamExt};
use parking_lot::Mutex;
Expand Down Expand Up @@ -627,13 +626,9 @@ where
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
.unwrap_or_else(recommended_number_of_farming_threads);

let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len());

for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() {
debug!(url = %node_rpc_url, %disk_farm_index, "Connecting to node RPC");
let node_client = NodeRpcClient::new(&node_rpc_url).await?;
let (plotting_delay_sender, plotting_delay_receiver) = oneshot::channel();
plotting_delay_senders.push(plotting_delay_sender);

let single_disk_farm_fut = SingleDiskFarm::new::<_, _, PosTable>(
SingleDiskFarmOptions {
Expand All @@ -652,7 +647,6 @@ where
farm_during_initial_plotting,
farming_thread_pool_size,
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
plotting_delay: Some(plotting_delay_receiver),
disable_farm_locking,
},
disk_farm_index,
Expand Down Expand Up @@ -696,30 +690,23 @@ where
single_disk_farms.push(single_disk_farm);
}

let cache_acknowledgement_receiver = farmer_cache
.replace_backing_caches(
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.piece_cache())
.collect(),
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.plot_cache())
.collect(),
)
.await;
// Acknowledgement is not necessary
drop(
farmer_cache
.replace_backing_caches(
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.piece_cache())
.collect(),
single_disk_farms
.iter()
.map(|single_disk_farm| single_disk_farm.plot_cache())
.collect(),
)
.await,
);
drop(farmer_cache);

// Wait for cache initialization before starting plotting
tokio::spawn(async move {
if cache_acknowledgement_receiver.await.is_ok() {
for plotting_delay_sender in plotting_delay_senders {
// Doesn't matter if receiver is gone
let _ = plotting_delay_sender.send(());
}
}
});

// Store piece readers so we can reference them later
let piece_readers = single_disk_farms
.iter()
Expand Down
7 changes: 6 additions & 1 deletion crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use futures::channel::oneshot;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt};
use parking_lot::RwLock;
use rayon::prelude::*;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -386,7 +387,11 @@ where
"Identified piece indices that should be cached",
);

let mut piece_indices_to_store = piece_indices_to_store.into_values();
let mut piece_indices_to_store = piece_indices_to_store.into_values().collect::<Vec<_>>();
// Sort pieces such that they are in ascending order and have higher chance of download
// overlapping with other processes like node's sync from DSN
piece_indices_to_store.par_sort_unstable();
let mut piece_indices_to_store = piece_indices_to_store.into_iter();

let download_piece = |piece_index| async move {
trace!(%piece_index, "Downloading piece");
Expand Down
11 changes: 0 additions & 11 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,6 @@ pub struct SingleDiskFarmOptions<NC, PG> {
pub farming_thread_pool_size: usize,
/// Thread pool manager used for plotting
pub plotting_thread_pool_manager: PlottingThreadPoolManager,
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
/// Disable farm locking, for example if file system doesn't support it
pub disable_farm_locking: bool,
}
Expand Down Expand Up @@ -619,7 +616,6 @@ impl SingleDiskFarm {
record_encoding_concurrency,
farming_thread_pool_size,
plotting_thread_pool_manager,
plotting_delay,
farm_during_initial_plotting,
disable_farm_locking,
} = options;
Expand Down Expand Up @@ -966,13 +962,6 @@ impl SingleDiskFarm {
return Ok(());
}

if let Some(plotting_delay) = plotting_delay {
if plotting_delay.await.is_err() {
// Dropped before resolving
return Ok(());
}
}

plotting::<_, _, PosTable>(plotting_options).await
};

Expand Down
119 changes: 117 additions & 2 deletions crates/subspace-farmer/src/utils/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::farmer_cache::FarmerCache;
use crate::utils::plotted_pieces::PlottedPieces;
use crate::NodeClient;
use async_lock::Mutex as AsyncMutex;
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
use backoff::ExponentialBackoff;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::error::Error;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
Expand All @@ -32,6 +34,7 @@ struct Inner<PV, NC> {
node_client: NC,
plotted_pieces: Arc<Mutex<Option<PlottedPieces>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
in_progress_pieces: Mutex<HashMap<PieceIndex, Arc<AsyncMutex<Option<Piece>>>>>,
}

pub struct FarmerPieceGetter<PV, NC> {
Expand Down Expand Up @@ -65,12 +68,47 @@ where
node_client,
plotted_pieces,
dsn_cache_retry_policy,
in_progress_pieces: Mutex::default(),
}),
}
}

/// Fast way to get piece using various caches
pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
let in_progress_piece_mutex = Arc::new(AsyncMutex::new(None));
// Take lock before anything else, set to `None` when another piece getting is already in
// progress
let mut local_in_progress_piece_guard = Some(in_progress_piece_mutex.lock().await);
let in_progress_piece_mutex = self
.inner
.in_progress_pieces
.lock()
.entry(piece_index)
.or_insert_with(|| {
local_in_progress_piece_guard.take();
Arc::clone(&in_progress_piece_mutex)
})
.clone();

// If piece is already in progress, just wait for it
if local_in_progress_piece_guard.is_none() {
trace!(%piece_index, "Piece is already in progress, waiting for result #1");
// Doesn't matter if it was successful or not here
return in_progress_piece_mutex.lock().await.clone();
}

// Otherwise try to get the piece without releasing lock to make sure successfully
// downloaded piece gets stored
let maybe_piece = self.get_piece_fast_internal(piece_index).await;
// Store the result for others to observe
if let Some(mut in_progress_piece) = local_in_progress_piece_guard {
*in_progress_piece = maybe_piece.clone();
self.inner.in_progress_pieces.lock().remove(&piece_index);
}
maybe_piece
}

async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
let key = RecordKey::from(piece_index.to_multihash());

let inner = &self.inner;
Expand Down Expand Up @@ -120,6 +158,46 @@ where

/// Slow way to get piece using archival storage
pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
let in_progress_piece_mutex = Arc::new(AsyncMutex::new(None));
// Take lock before anything else, set to `None` when another piece getting is already in
// progress
let mut local_in_progress_piece_guard = Some(in_progress_piece_mutex.lock().await);
let in_progress_piece_mutex = self
.inner
.in_progress_pieces
.lock()
.entry(piece_index)
.or_insert_with(|| {
local_in_progress_piece_guard.take();
Arc::clone(&in_progress_piece_mutex)
})
.clone();

// If piece is already in progress, wait for it to see if it was successful
if local_in_progress_piece_guard.is_none() {
trace!(%piece_index, "Piece is already in progress, waiting for result #2");
if let Some(piece) = in_progress_piece_mutex.lock().await.clone() {
trace!(
%piece_index,
"Piece was already in progress and downloaded successfully #1"
);
return Some(piece);
}
}

// Otherwise try to get the piece without releasing lock to make sure successfully
// downloaded piece gets stored
let maybe_piece = self.get_piece_slow_internal(piece_index).await;
// Store the result for others to observe
if let Some(mut in_progress_piece) = local_in_progress_piece_guard {
*in_progress_piece = maybe_piece.clone();
self.inner.in_progress_pieces.lock().remove(&piece_index);
}
maybe_piece
}

/// Slow way to get piece using archival storage
async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option<Piece> {
let inner = &self.inner;

trace!(%piece_index, "Getting piece from local plot");
Expand Down Expand Up @@ -179,6 +257,33 @@ where
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
let in_progress_piece_mutex = Arc::new(AsyncMutex::new(None));
// Take lock before anything else, set to `None` when another piece getting is already in
// progress
let mut local_in_progress_piece_guard = Some(in_progress_piece_mutex.lock().await);
let in_progress_piece_mutex = self
.inner
.in_progress_pieces
.lock()
.entry(piece_index)
.or_insert_with(|| {
local_in_progress_piece_guard.take();
Arc::clone(&in_progress_piece_mutex)
})
.clone();

// If piece is already in progress, wait for it to see if it was successful
if local_in_progress_piece_guard.is_none() {
trace!(%piece_index, "Piece is already in progress, waiting for result #3");
if let Some(piece) = in_progress_piece_mutex.lock().await.clone() {
trace!(
%piece_index,
"Piece was already in progress and downloaded successfully #2"
);
return Ok(Some(piece));
}
}

{
let retries = AtomicU32::new(0);
let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
Expand All @@ -188,7 +293,7 @@ where
let maybe_piece_fut = retry(backoff, || async {
let current_attempt = retries.fetch_add(1, Ordering::Relaxed);

if let Some(piece) = self.get_piece_fast(piece_index).await {
if let Some(piece) = self.get_piece_fast_internal(piece_index).await {
trace!(%piece_index, current_attempt, "Got piece from DSN L2 cache");
return Ok(Some(piece));
}
Expand All @@ -211,11 +316,21 @@ where

if let Ok(Some(piece)) = maybe_piece_fut.await {
trace!(%piece_index, "Got piece from DSN L2 cache successfully");
// Store successfully downloaded piece for others to observe
if let Some(mut in_progress_piece) = local_in_progress_piece_guard {
in_progress_piece.replace(piece.clone());
self.inner.in_progress_pieces.lock().remove(&piece_index);
}
return Ok(Some(piece));
}
};

if let Some(piece) = self.get_piece_slow(piece_index).await {
if let Some(piece) = self.get_piece_slow_internal(piece_index).await {
// Store successfully downloaded piece for others to observe
if let Some(mut in_progress_piece) = local_in_progress_piece_guard {
in_progress_piece.replace(piece.clone());
self.inner.in_progress_pieces.lock().remove(&piece_index);
}
return Ok(Some(piece));
}

Expand Down

0 comments on commit 3765fa4

Please sign in to comment.