diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 6aea086c89..8c1e836f6e 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -8,7 +8,6 @@ use anyhow::anyhow; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::{Parser, ValueHint}; -use futures::channel::oneshot; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{FutureExt, StreamExt}; use parking_lot::Mutex; @@ -627,13 +626,9 @@ where .map(|farming_thread_pool_size| farming_thread_pool_size.get()) .unwrap_or_else(recommended_number_of_farming_threads); - let mut plotting_delay_senders = Vec::with_capacity(disk_farms.len()); - for (disk_farm_index, disk_farm) in disk_farms.into_iter().enumerate() { debug!(url = %node_rpc_url, %disk_farm_index, "Connecting to node RPC"); let node_client = NodeRpcClient::new(&node_rpc_url).await?; - let (plotting_delay_sender, plotting_delay_receiver) = oneshot::channel(); - plotting_delay_senders.push(plotting_delay_sender); let single_disk_farm_fut = SingleDiskFarm::new::<_, _, PosTable>( SingleDiskFarmOptions { @@ -652,7 +647,6 @@ where farm_during_initial_plotting, farming_thread_pool_size, plotting_thread_pool_manager: plotting_thread_pool_manager.clone(), - plotting_delay: Some(plotting_delay_receiver), disable_farm_locking, }, disk_farm_index, @@ -696,30 +690,23 @@ where single_disk_farms.push(single_disk_farm); } - let cache_acknowledgement_receiver = farmer_cache - .replace_backing_caches( - single_disk_farms - .iter() - .map(|single_disk_farm| single_disk_farm.piece_cache()) - .collect(), - single_disk_farms - .iter() - .map(|single_disk_farm| single_disk_farm.plot_cache()) - .collect(), - ) - .await; + // Acknowledgement is not necessary + drop( + farmer_cache + .replace_backing_caches( + single_disk_farms + .iter() + .map(|single_disk_farm| single_disk_farm.piece_cache()) + .collect(), + single_disk_farms + .iter() + .map(|single_disk_farm| single_disk_farm.plot_cache()) + .collect(), + ) + .await, + ); drop(farmer_cache); - // Wait for cache initialization before starting plotting - tokio::spawn(async move { - if cache_acknowledgement_receiver.await.is_ok() { - for plotting_delay_sender in plotting_delay_senders { - // Doesn't matter if receiver is gone - let _ = plotting_delay_sender.send(()); - } - } - }); - // Store piece readers so we can reference them later let piece_readers = single_disk_farms .iter() diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 9574d45745..93b798bf0f 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -10,6 +10,7 @@ use futures::channel::oneshot; use futures::stream::{FuturesOrdered, FuturesUnordered}; use futures::{select, FutureExt, StreamExt}; use parking_lot::RwLock; +use rayon::prelude::*; use std::collections::{HashMap, VecDeque}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -386,7 +387,11 @@ where "Identified piece indices that should be cached", ); - let mut piece_indices_to_store = piece_indices_to_store.into_values(); + let mut piece_indices_to_store = piece_indices_to_store.into_values().collect::>(); + // Sort pieces such that they are in ascending order and have higher chance of download + // overlapping with other processes like node's sync from DSN + piece_indices_to_store.par_sort_unstable(); + let mut piece_indices_to_store = piece_indices_to_store.into_iter(); let download_piece = |piece_index| async move { trace!(%piece_index, "Downloading piece"); diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 9dac067c45..f828dee8d6 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -288,9 +288,6 @@ pub struct SingleDiskFarmOptions { pub farming_thread_pool_size: usize, /// Thread pool manager used for plotting pub plotting_thread_pool_manager: PlottingThreadPoolManager, - /// Notification for plotter to start, can be used to delay plotting until some initialization - /// has happened externally - pub plotting_delay: Option>, /// Disable farm locking, for example if file system doesn't support it pub disable_farm_locking: bool, } @@ -619,7 +616,6 @@ impl SingleDiskFarm { record_encoding_concurrency, farming_thread_pool_size, plotting_thread_pool_manager, - plotting_delay, farm_during_initial_plotting, disable_farm_locking, } = options; @@ -966,13 +962,6 @@ impl SingleDiskFarm { return Ok(()); } - if let Some(plotting_delay) = plotting_delay { - if plotting_delay.await.is_err() { - // Dropped before resolving - return Ok(()); - } - } - plotting::<_, _, PosTable>(plotting_options).await }; diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index ad8e477b39..1a29bc1d6a 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -1,11 +1,13 @@ use crate::farmer_cache::FarmerCache; use crate::utils::plotted_pieces::PlottedPieces; use crate::NodeClient; +use async_lock::Mutex as AsyncMutex; use async_trait::async_trait; use backoff::backoff::Backoff; use backoff::future::retry; use backoff::ExponentialBackoff; use parking_lot::Mutex; +use std::collections::HashMap; use std::error::Error; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::{Arc, Weak}; @@ -32,6 +34,7 @@ struct Inner { node_client: NC, plotted_pieces: Arc>>, dsn_cache_retry_policy: DsnCacheRetryPolicy, + in_progress_pieces: Mutex>>>>, } pub struct FarmerPieceGetter { @@ -65,12 +68,47 @@ where node_client, plotted_pieces, dsn_cache_retry_policy, + in_progress_pieces: Mutex::default(), }), } } /// Fast way to get piece using various caches pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option { + let in_progress_piece_mutex = Arc::new(AsyncMutex::new(None)); + // Take lock before anything else, set to `None` when another piece getting is already in + // progress + let mut local_in_progress_piece_guard = Some(in_progress_piece_mutex.lock().await); + let in_progress_piece_mutex = self + .inner + .in_progress_pieces + .lock() + .entry(piece_index) + .or_insert_with(|| { + local_in_progress_piece_guard.take(); + Arc::clone(&in_progress_piece_mutex) + }) + .clone(); + + // If piece is already in progress, just wait for it + if local_in_progress_piece_guard.is_none() { + trace!(%piece_index, "Piece is already in progress, waiting for result #1"); + // Doesn't matter if it was successful or not here + return in_progress_piece_mutex.lock().await.clone(); + } + + // Otherwise try to get the piece without releasing lock to make sure successfully + // downloaded piece gets stored + let maybe_piece = self.get_piece_fast_internal(piece_index).await; + // Store the result for others to observe + if let Some(mut in_progress_piece) = local_in_progress_piece_guard { + *in_progress_piece = maybe_piece.clone(); + self.inner.in_progress_pieces.lock().remove(&piece_index); + } + maybe_piece + } + + async fn get_piece_fast_internal(&self, piece_index: PieceIndex) -> Option { let key = RecordKey::from(piece_index.to_multihash()); let inner = &self.inner; @@ -120,6 +158,46 @@ where /// Slow way to get piece using archival storage pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option { + let in_progress_piece_mutex = Arc::new(AsyncMutex::new(None)); + // Take lock before anything else, set to `None` when another piece getting is already in + // progress + let mut local_in_progress_piece_guard = Some(in_progress_piece_mutex.lock().await); + let in_progress_piece_mutex = self + .inner + .in_progress_pieces + .lock() + .entry(piece_index) + .or_insert_with(|| { + local_in_progress_piece_guard.take(); + Arc::clone(&in_progress_piece_mutex) + }) + .clone(); + + // If piece is already in progress, wait for it to see if it was successful + if local_in_progress_piece_guard.is_none() { + trace!(%piece_index, "Piece is already in progress, waiting for result #2"); + if let Some(piece) = in_progress_piece_mutex.lock().await.clone() { + trace!( + %piece_index, + "Piece was already in progress and downloaded successfully #1" + ); + return Some(piece); + } + } + + // Otherwise try to get the piece without releasing lock to make sure successfully + // downloaded piece gets stored + let maybe_piece = self.get_piece_slow_internal(piece_index).await; + // Store the result for others to observe + if let Some(mut in_progress_piece) = local_in_progress_piece_guard { + *in_progress_piece = maybe_piece.clone(); + self.inner.in_progress_pieces.lock().remove(&piece_index); + } + maybe_piece + } + + /// Slow way to get piece using archival storage + async fn get_piece_slow_internal(&self, piece_index: PieceIndex) -> Option { let inner = &self.inner; trace!(%piece_index, "Getting piece from local plot"); @@ -179,6 +257,33 @@ where &self, piece_index: PieceIndex, ) -> Result, Box> { + let in_progress_piece_mutex = Arc::new(AsyncMutex::new(None)); + // Take lock before anything else, set to `None` when another piece getting is already in + // progress + let mut local_in_progress_piece_guard = Some(in_progress_piece_mutex.lock().await); + let in_progress_piece_mutex = self + .inner + .in_progress_pieces + .lock() + .entry(piece_index) + .or_insert_with(|| { + local_in_progress_piece_guard.take(); + Arc::clone(&in_progress_piece_mutex) + }) + .clone(); + + // If piece is already in progress, wait for it to see if it was successful + if local_in_progress_piece_guard.is_none() { + trace!(%piece_index, "Piece is already in progress, waiting for result #3"); + if let Some(piece) = in_progress_piece_mutex.lock().await.clone() { + trace!( + %piece_index, + "Piece was already in progress and downloaded successfully #2" + ); + return Ok(Some(piece)); + } + } + { let retries = AtomicU32::new(0); let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries); @@ -188,7 +293,7 @@ where let maybe_piece_fut = retry(backoff, || async { let current_attempt = retries.fetch_add(1, Ordering::Relaxed); - if let Some(piece) = self.get_piece_fast(piece_index).await { + if let Some(piece) = self.get_piece_fast_internal(piece_index).await { trace!(%piece_index, current_attempt, "Got piece from DSN L2 cache"); return Ok(Some(piece)); } @@ -211,11 +316,21 @@ where if let Ok(Some(piece)) = maybe_piece_fut.await { trace!(%piece_index, "Got piece from DSN L2 cache successfully"); + // Store successfully downloaded piece for others to observe + if let Some(mut in_progress_piece) = local_in_progress_piece_guard { + in_progress_piece.replace(piece.clone()); + self.inner.in_progress_pieces.lock().remove(&piece_index); + } return Ok(Some(piece)); } }; - if let Some(piece) = self.get_piece_slow(piece_index).await { + if let Some(piece) = self.get_piece_slow_internal(piece_index).await { + // Store successfully downloaded piece for others to observe + if let Some(mut in_progress_piece) = local_in_progress_piece_guard { + in_progress_piece.replace(piece.clone()); + self.inner.in_progress_pieces.lock().remove(&piece_index); + } return Ok(Some(piece)); }