diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs index c454c89855..d3348e3b4d 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs @@ -114,7 +114,7 @@ fn audit( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) - .map_err(|error| anyhow::anyhow!(error))?; + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; let table_generator = Mutex::new(PosTable::generator()); let sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm) @@ -276,7 +276,7 @@ fn prove( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) - .map_err(|error| anyhow::anyhow!(error))?; + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; let table_generator = Mutex::new(PosTable::generator()); let mut sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index 120df1a038..674654bb04 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -90,8 +90,8 @@ pub(crate) struct FarmingArgs { #[arg(long)] dev: bool, /// Run temporary farmer with specified plot size in human-readable format (e.g. 10GB, 2TiB) or - /// just bytes (e.g. 4096), this will create a temporary directory for storing farmer data that - /// will be deleted at the end of the process. + /// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the + /// end of the process. #[arg(long, conflicts_with = "disk_farms")] tmp: Option, /// Maximum number of pieces in sector (can override protocol value to something lower). @@ -257,7 +257,7 @@ where !cfg!(windows) || disk_farms .iter() - .map(|farm| farm.allocated_plotting_space) + .map(|farm| farm.allocated_space) .sum::() <= MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS }); @@ -272,7 +272,7 @@ where disk_farms = vec![DiskFarm { directory: tmp_directory.as_ref().to_path_buf(), - allocated_plotting_space: plot_size.as_u64(), + allocated_space: plot_size.as_u64(), read_sector_record_chunks_mode: None, }]; @@ -304,7 +304,7 @@ where let farmer_app_info = node_client .farmer_app_info() .await - .map_err(|error| anyhow!(error))?; + .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; let first_farm_directory = &disk_farms .first() @@ -370,7 +370,7 @@ where NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) .expect("Not zero; qed"), ) - .map_err(|error| anyhow!(error))?; + .map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?; let validator = Some(SegmentCommitmentPieceValidator::new( node.clone(), node_client.clone(), @@ -542,7 +542,7 @@ where SingleDiskFarmOptions { directory: disk_farm.directory.clone(), farmer_app_info, - allocated_space: disk_farm.allocated_plotting_space, + allocated_space: disk_farm.allocated_space, max_pieces_in_sector, node_client, reward_address, @@ -679,7 +679,7 @@ where plotted_pieces.add_farm(farm_index, farm.piece_reader()); - let total_sector_count = farm.total_sectors_count(); + let total_sectors_count = farm.total_sectors_count(); let mut plotted_sectors_count = 0; let plotted_sectors = farm.plotted_sectors(); let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { @@ -698,7 +698,7 @@ where ) } - total_and_plotted_sectors.push((total_sector_count, plotted_sectors_count)); + total_and_plotted_sectors.push((total_sectors_count, plotted_sectors_count)); } info!("Finished collecting already plotted pieces successfully"); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs index 05eaf548fb..4d4cecf8ac 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs @@ -58,10 +58,10 @@ impl From for Option { #[derive(Debug, Clone)] pub(in super::super) struct DiskFarm { - /// Path to directory where data is stored. + /// Path to directory where farm is stored pub(in super::super) directory: PathBuf, - /// How much space in bytes can farm use for plots (metadata space is not included) - pub(in super::super) allocated_plotting_space: u64, + /// How much space in bytes can farm use + pub(in super::super) allocated_space: u64, /// Which mode to use for reading of sector record chunks pub(in super::super) read_sector_record_chunks_mode: Option, } @@ -76,7 +76,7 @@ impl FromStr for DiskFarm { } let mut plot_directory = None; - let mut allocated_plotting_space = None; + let mut allocated_space = None; let mut read_sector_record_chunks_mode = None; for part in parts { @@ -93,7 +93,7 @@ impl FromStr for DiskFarm { plot_directory.replace(PathBuf::from(value)); } "size" => { - allocated_plotting_space.replace( + allocated_space.replace( value .parse::() .map_err(|error| { @@ -121,12 +121,10 @@ impl FromStr for DiskFarm { } Ok(DiskFarm { - directory: plot_directory.ok_or({ - "`path` key is required with path to directory where plots will be stored" - })?, - allocated_plotting_space: allocated_plotting_space.ok_or({ - "`size` key is required with path to directory where plots will be stored" - })?, + directory: plot_directory + .ok_or("`path` key is required with path to directory where farm will be stored")?, + allocated_space: allocated_space + .ok_or("`size` key is required with allocated amount of disk space")?, read_sector_record_chunks_mode, }) } diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs index 1a77b01705..1502d893dd 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/network.rs @@ -137,8 +137,7 @@ where let read_piece_fut = match weak_plotted_pieces.upgrade() { Some(plotted_pieces) => plotted_pieces - .read() - .await + .try_read()? .read_piece(piece_index)? .in_current_span(), None => { diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 43b30b06e5..d727ec8a29 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -42,7 +42,7 @@ pub struct PieceCacheOffset(pub(crate) u32); #[async_trait] pub trait PieceCache: Send + Sync + fmt::Debug { /// Max number of elements in this cache - fn max_num_elements(&self) -> usize; + fn max_num_elements(&self) -> u32; /// Contents of this piece cache. /// @@ -50,7 +50,15 @@ pub trait PieceCache: Send + Sync + fmt::Debug { /// doesn't happen for the same piece being accessed! async fn contents( &self, - ) -> Box)> + Unpin + Send + '_>; + ) -> Result< + Box< + dyn Stream), FarmError>> + + Unpin + + Send + + '_, + >, + FarmError, + >; /// Store piece in cache at specified offset, replacing existing piece if there is any. /// @@ -433,3 +441,55 @@ pub trait Farm { /// Run and wait for background threads to exit or return an error fn run(self: Box) -> Pin> + Send>>; } + +#[async_trait] +impl Farm for Box +where + T: Farm + ?Sized, +{ + fn id(&self) -> &FarmId { + self.as_ref().id() + } + + fn total_sectors_count(&self) -> SectorIndex { + self.as_ref().total_sectors_count() + } + + fn plotted_sectors(&self) -> Arc { + self.as_ref().plotted_sectors() + } + + fn piece_cache(&self) -> Arc { + self.as_ref().piece_cache() + } + + fn plot_cache(&self) -> Arc { + self.as_ref().plot_cache() + } + + fn piece_reader(&self) -> Arc { + self.as_ref().piece_reader() + } + + fn on_sector_update( + &self, + callback: HandlerFn<(SectorIndex, SectorUpdate)>, + ) -> Box { + self.as_ref().on_sector_update(callback) + } + + fn on_farming_notification( + &self, + callback: HandlerFn, + ) -> Box { + self.as_ref().on_farming_notification(callback) + } + + fn on_solution(&self, callback: HandlerFn) -> Box { + self.as_ref().on_solution(callback) + } + + fn run(self: Box) -> Pin> + Send>> { + (*self).run() + } +} diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index b09163f959..930ed6873c 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -237,11 +237,42 @@ where .map( |(index, ((mut stored_pieces, mut free_offsets), new_cache))| { run_future_in_dedicated_thread( - move || async { - let mut contents = new_cache.contents().await; - stored_pieces.reserve(new_cache.max_num_elements()); - - while let Some((offset, maybe_piece_index)) = contents.next().await { + move || async move { + // Hack with first collecting into `Option` with `Option::take()` call + // later is to satisfy compiler that gets confused about ownership + // otherwise + let mut maybe_contents = match new_cache.contents().await { + Ok(contents) => Some(contents), + Err(error) => { + warn!(%index, %error, "Failed to get cache contents"); + + None + } + }; + let Some(mut contents) = maybe_contents.take() else { + drop(maybe_contents); + + return PieceCacheState { + stored_pieces, + free_offsets, + backend: new_cache, + }; + }; + + stored_pieces.reserve(new_cache.max_num_elements() as usize); + + while let Some(maybe_element_details) = contents.next().await { + let (offset, maybe_piece_index) = match maybe_element_details { + Ok(element_details) => element_details, + Err(error) => { + warn!( + %index, + %error, + "Failed to get cache contents element details" + ); + break; + } + }; match maybe_piece_index { Some(piece_index) => { stored_pieces.insert( @@ -258,6 +289,7 @@ where yield_now().await; } + drop(maybe_contents); drop(contents); PieceCacheState { diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/piece_cache.rs index 3f005b8da5..40711409da 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/piece_cache.rs @@ -48,8 +48,8 @@ pub enum PieceCacheError { /// Max offset max: u32, }, - /// Cache size has zero capacity, this is not supported - #[error("Cache size has zero capacity, this is not supported")] + /// Cache size has zero capacity, this is not supported, cache size needs to be larger + #[error("Cache size has zero capacity, this is not supported, cache size needs to be larger")] ZeroCapacity, /// Checksum mismatch #[error("Checksum mismatch")] @@ -62,7 +62,7 @@ struct Inner { file: File, #[cfg(windows)] file: UnbufferedIoFileWindows, - num_elements: u32, + max_num_elements: u32, } /// Dedicated piece cache stored on one disk, is used both to accelerate DSN queries and to plot @@ -74,20 +74,28 @@ pub struct PieceCache { #[async_trait] impl farm::PieceCache for PieceCache { - fn max_num_elements(&self) -> usize { - self.inner.num_elements as usize + fn max_num_elements(&self) -> u32 { + self.inner.max_num_elements } async fn contents( &self, - ) -> Box)> + Unpin + Send + '_> { + ) -> Result< + Box< + dyn Stream), FarmError>> + + Unpin + + Send + + '_, + >, + FarmError, + > { let this = self.clone(); let (mut sender, receiver) = mpsc::channel(1); let read_contents = task::spawn_blocking(move || { let contents = this.contents(); for (piece_cache_offset, maybe_piece) in contents { if let Err(error) = - Handle::current().block_on(sender.send((piece_cache_offset, maybe_piece))) + Handle::current().block_on(sender.send(Ok((piece_cache_offset, maybe_piece)))) { debug!(%error, "Aborting contents iteration due to receiver dropping"); break; @@ -98,7 +106,7 @@ impl farm::PieceCache for PieceCache { // Change order such that in closure below `receiver` is dropped before `read_contents` let mut receiver = receiver; - Box::new(stream::poll_fn(move |ctx| { + Ok(Box::new(stream::poll_fn(move |ctx| { let poll_result = receiver.poll_next_unpin(ctx); if matches!(poll_result, Poll::Ready(None)) { @@ -106,7 +114,7 @@ impl farm::PieceCache for PieceCache { } poll_result - })) + }))) } async fn write_piece( @@ -133,7 +141,8 @@ impl farm::PieceCache for PieceCache { impl PieceCache { pub(crate) const FILE_NAME: &'static str = "piece_cache.bin"; - pub(crate) fn open(directory: &Path, capacity: u32) -> Result { + /// Open cache, capacity is measured in elements of [`PieceCache::element_size()`] size + pub fn open(directory: &Path, capacity: u32) -> Result { if capacity == 0 { return Err(PieceCacheError::ZeroCapacity); } @@ -168,12 +177,12 @@ impl PieceCache { Ok(Self { inner: Arc::new(Inner { file, - num_elements: capacity, + max_num_elements: capacity, }), }) } - pub(crate) const fn element_size() -> u32 { + pub const fn element_size() -> u32 { (PieceIndex::SIZE + Piece::SIZE + mem::size_of::()) as u32 } @@ -188,7 +197,7 @@ impl PieceCache { let mut current_skip = 0; // TODO: Parallelize or read in larger batches - (0..self.inner.num_elements).map(move |offset| { + (0..self.inner.max_num_elements).map(move |offset| { if current_skip > CONTENTS_READ_SKIP_LIMIT { return (PieceCacheOffset(offset), None); } @@ -225,10 +234,10 @@ impl PieceCache { piece: &Piece, ) -> Result<(), PieceCacheError> { let PieceCacheOffset(offset) = offset; - if offset >= self.inner.num_elements { + if offset >= self.inner.max_num_elements { return Err(PieceCacheError::OffsetOutsideOfRange { provided: offset, - max: self.inner.num_elements - 1, + max: self.inner.max_num_elements - 1, }); } @@ -260,11 +269,11 @@ impl PieceCache { offset: PieceCacheOffset, ) -> Result, PieceCacheError> { let PieceCacheOffset(offset) = offset; - if offset >= self.inner.num_elements { + if offset >= self.inner.max_num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); return Err(PieceCacheError::OffsetOutsideOfRange { provided: offset, - max: self.inner.num_elements - 1, + max: self.inner.max_num_elements - 1, }); } @@ -282,11 +291,11 @@ impl PieceCache { offset: PieceCacheOffset, ) -> Result, PieceCacheError> { let PieceCacheOffset(offset) = offset; - if offset >= self.inner.num_elements { + if offset >= self.inner.max_num_elements { warn!(%offset, "Trying to read piece out of range, this must be an implementation bug"); return Err(PieceCacheError::OffsetOutsideOfRange { provided: offset, - max: self.inner.num_elements - 1, + max: self.inner.max_num_elements - 1, }); } diff --git a/crates/subspace-farmer/src/plotter.rs b/crates/subspace-farmer/src/plotter.rs index ae4cf760ab..dc8ac01bfd 100644 --- a/crates/subspace-farmer/src/plotter.rs +++ b/crates/subspace-farmer/src/plotter.rs @@ -70,7 +70,10 @@ impl fmt::Debug for SectorPlottingProgress { /// Abstract plotter implementation #[async_trait] pub trait Plotter { - /// Plot one sector, returns a stream of sector plotting events. + /// Whether plotter has free capacity to encode more sectors + async fn has_free_capacity(&self) -> Result; + + /// Plot one sector, sending sector plotting events via provided stream. /// /// Future returns once plotting is successfully scheduled (for backpressure purposes). async fn plot_sector( @@ -84,6 +87,23 @@ pub trait Plotter { ) where PS: Sink + Unpin + Send + 'static, PS::Error: Error; + + /// Try to plot one sector, sending sector plotting events via provided stream. + /// + /// Returns `true` if plotting started successfully and `false` if there is no capacity to start + /// plotting immediately. + async fn try_plot_sector( + &self, + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, + replotting: bool, + progress_sender: PS, + ) -> bool + where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error; } #[async_trait] @@ -91,6 +111,10 @@ impl

Plotter for Arc

where P: Plotter + Send + Sync, { + async fn has_free_capacity(&self) -> Result { + self.as_ref().has_free_capacity().await + } + async fn plot_sector( &self, public_key: PublicKey, @@ -114,4 +138,29 @@ where ) .await } + + async fn try_plot_sector( + &self, + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, + replotting: bool, + progress_sender: PS, + ) -> bool + where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, + { + self.as_ref() + .try_plot_sector( + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + replotting, + progress_sender, + ) + .await + } } diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index 1ef8343df5..cdbfa213f0 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -23,7 +23,7 @@ use subspace_farmer_components::plotting::{ }; use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter}; use subspace_proof_of_space::Table; -use tokio::sync::Semaphore; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tokio::task::yield_now; use tracing::warn; @@ -64,6 +64,10 @@ where PG: PieceGetter + Clone + Send + Sync + 'static, PosTable: Table, { + async fn has_free_capacity(&self) -> Result { + Ok(self.downloading_semaphore.available_permits() > 0) + } + async fn plot_sector( &self, public_key: PublicKey, @@ -76,12 +80,6 @@ where PS: Sink + Unpin + Send + 'static, PS::Error: Error, { - let progress_updater = ProgressUpdater { - public_key, - sector_index, - handlers: Arc::clone(&self.handlers), - }; - let start = Instant::now(); // Done outside the future below as a backpressure, ensuring that it is not possible to @@ -94,6 +92,12 @@ where Err(error) => { warn!(%error, "Failed to acquire downloading permit"); + let progress_updater = ProgressUpdater { + public_key, + sector_index, + handlers: Arc::clone(&self.handlers), + }; + progress_updater .update_progress_and_events( &mut progress_sender, @@ -107,6 +111,145 @@ where } }; + self.plot_sector_internal( + start, + downloading_permit, + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + replotting, + progress_sender, + ) + .await + } + + async fn try_plot_sector( + &self, + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, + replotting: bool, + progress_sender: PS, + ) -> bool + where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, + { + let start = Instant::now(); + + let Ok(downloading_permit) = Arc::clone(&self.downloading_semaphore).try_acquire_owned() + else { + return false; + }; + + self.plot_sector_internal( + start, + downloading_permit, + public_key, + sector_index, + farmer_protocol_info, + pieces_in_sector, + replotting, + progress_sender, + ) + .await; + + true + } +} + +impl CpuPlotter +where + PG: PieceGetter + Clone + Send + Sync + 'static, + PosTable: Table, +{ + /// Create new instance + pub fn new( + piece_getter: PG, + downloading_semaphore: Arc, + plotting_thread_pool_manager: PlottingThreadPoolManager, + record_encoding_concurrency: NonZeroUsize, + global_mutex: Arc>, + kzg: Kzg, + erasure_coding: ErasureCoding, + ) -> Self { + let (tasks_sender, mut tasks_receiver) = mpsc::channel(1); + + // Basically runs plotting tasks in the background and allows to abort on drop + let background_tasks = AsyncJoinOnDrop::new( + tokio::spawn(async move { + let background_tasks = FuturesUnordered::new(); + let mut background_tasks = pin!(background_tasks); + // Just so that `FuturesUnordered` will never end + background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true)); + + loop { + select! { + maybe_background_task = tasks_receiver.next().fuse() => { + let Some(background_task) = maybe_background_task else { + break; + }; + + background_tasks.push(background_task); + }, + _ = background_tasks.select_next_some() => { + // Nothing to do + } + } + } + }), + true, + ); + + let abort_early = Arc::new(AtomicBool::new(false)); + + Self { + piece_getter, + downloading_semaphore, + plotting_thread_pool_manager, + record_encoding_concurrency, + global_mutex, + kzg, + erasure_coding, + handlers: Arc::default(), + tasks_sender, + _background_tasks: background_tasks, + abort_early, + _phantom: PhantomData, + } + } + + /// Subscribe to plotting progress notifications + pub fn on_plotting_progress( + &self, + callback: HandlerFn3, + ) -> HandlerId { + self.handlers.plotting_progress.add(callback) + } + + #[allow(clippy::too_many_arguments)] + async fn plot_sector_internal( + &self, + start: Instant, + downloading_permit: OwnedSemaphorePermit, + public_key: PublicKey, + sector_index: SectorIndex, + farmer_protocol_info: FarmerProtocolInfo, + pieces_in_sector: u16, + replotting: bool, + mut progress_sender: PS, + ) where + PS: Sink + Unpin + Send + 'static, + PS::Error: Error, + { + let progress_updater = ProgressUpdater { + public_key, + sector_index, + handlers: Arc::clone(&self.handlers), + }; + let plotting_fut = { let piece_getter = self.piece_getter.clone(); let plotting_thread_pool_manager = self.plotting_thread_pool_manager.clone(); @@ -132,7 +275,7 @@ where // Take mutex briefly to make sure plotting is allowed right now global_mutex.lock().await; - let start = Instant::now(); + let downloading_start = Instant::now(); let downloaded_sector_fut = download_sector(DownloadSectorOptions { public_key: &public_key, @@ -164,7 +307,7 @@ where if !progress_updater .update_progress_and_events( &mut progress_sender, - SectorPlottingProgress::Downloaded(start.elapsed()), + SectorPlottingProgress::Downloaded(downloading_start.elapsed()), ) .await { @@ -220,7 +363,7 @@ where return; } - let start = Instant::now(); + let encoding_start = Instant::now(); let plotting_result = thread_pool.install(plotting_fn); @@ -229,7 +372,7 @@ where if !progress_updater .update_progress_and_events( &mut progress_sender, - SectorPlottingProgress::Encoded(start.elapsed()), + SectorPlottingProgress::Encoded(encoding_start.elapsed()), ) .await { @@ -287,76 +430,6 @@ where } } -impl CpuPlotter -where - PG: PieceGetter + Clone + Send + Sync + 'static, - PosTable: Table, -{ - /// Create new instance - pub fn new( - piece_getter: PG, - downloading_semaphore: Arc, - plotting_thread_pool_manager: PlottingThreadPoolManager, - record_encoding_concurrency: NonZeroUsize, - global_mutex: Arc>, - kzg: Kzg, - erasure_coding: ErasureCoding, - ) -> Self { - let (tasks_sender, mut tasks_receiver) = mpsc::channel(1); - - // Basically runs plotting tasks in the background and allows to abort on drop - let background_tasks = AsyncJoinOnDrop::new( - tokio::spawn(async move { - let background_tasks = FuturesUnordered::new(); - let mut background_tasks = pin!(background_tasks); - // Just so that `FuturesUnordered` will never end - background_tasks.push(AsyncJoinOnDrop::new(tokio::spawn(pending::<()>()), true)); - - loop { - select! { - maybe_background_task = tasks_receiver.next().fuse() => { - if let Some(background_task) = maybe_background_task { - background_tasks.push(background_task); - } else { - break; - } - }, - _ = background_tasks.select_next_some() => { - // Nothing to do - } - } - } - }), - true, - ); - - let abort_early = Arc::new(AtomicBool::new(false)); - - Self { - piece_getter, - downloading_semaphore, - plotting_thread_pool_manager, - record_encoding_concurrency, - global_mutex, - kzg, - erasure_coding, - handlers: Arc::default(), - tasks_sender, - _background_tasks: background_tasks, - abort_early, - _phantom: PhantomData, - } - } - - /// Subscribe to plotting progress notifications - pub fn on_plotting_progress( - &self, - callback: HandlerFn3, - ) -> HandlerId { - self.handlers.plotting_progress.add(callback) - } -} - struct ProgressUpdater { public_key: PublicKey, sector_index: SectorIndex, diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs index 1e270e82d1..42ee9e8dd4 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_cache.rs @@ -14,7 +14,7 @@ pub struct DiskPieceCache { #[async_trait] impl farm::PieceCache for DiskPieceCache { - fn max_num_elements(&self) -> usize { + fn max_num_elements(&self) -> u32 { if let Some(piece_cache) = &self.maybe_piece_cache { piece_cache.max_num_elements() } else { @@ -24,11 +24,19 @@ impl farm::PieceCache for DiskPieceCache { async fn contents( &self, - ) -> Box)> + Unpin + Send + '_> { + ) -> Result< + Box< + dyn Stream), FarmError>> + + Unpin + + Send + + '_, + >, + FarmError, + > { if let Some(piece_cache) = &self.maybe_piece_cache { farm::PieceCache::contents(piece_cache).await } else { - Box::new(stream::empty()) + Ok(Box::new(stream::empty())) } } diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 4eb1cdf3c9..0ee1cce5f5 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -134,34 +134,35 @@ where loop { select! { maybe_sector_to_plot = sectors_to_plot_receiver.next() => { - if let Some(sector_to_plot) = maybe_sector_to_plot { - let sector_plotting_init_fut = plot_single_sector(sector_to_plot, §or_plotting_options).fuse(); - let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut); - - // Wait for plotting of new sector to start (backpressure), while also waiting - // for sectors that already started plotting to finish plotting and then update - // metadata header - loop { - select! { - sector_plotting_init_result = sector_plotting_init_fut => { - sectors_being_plotted.push_back(sector_plotting_init_result?); - break; - } - maybe_sector_plotting_result = maybe_wait_futures_ordered(&mut sectors_being_plotted).fuse() => { - process_plotting_result( - maybe_sector_plotting_result?, - &mut metadata_header, - §or_plotting_options.metadata_file - )?; - } + let Some(sector_to_plot) = maybe_sector_to_plot else { + break; + }; + + let sector_plotting_init_fut = plot_single_sector(sector_to_plot, §or_plotting_options).fuse(); + let mut sector_plotting_init_fut = pin!(sector_plotting_init_fut); + + // Wait for plotting of new sector to start (backpressure), while also waiting + // for sectors that already started plotting to finish plotting and then update + // metadata header + loop { + select! { + sector_plotting_init_result = sector_plotting_init_fut => { + sectors_being_plotted.push_back(sector_plotting_init_result?); + break; + } + maybe_sector_plotting_result = maybe_wait_futures_ordered(&mut sectors_being_plotted).fuse() => { + process_plotting_result( + maybe_sector_plotting_result?, + &mut metadata_header, + §or_plotting_options.metadata_file + )?; } } - } else { - break; } } maybe_sector_plotting_result = maybe_wait_futures_ordered(&mut sectors_being_plotted).fuse() => { process_plotting_result( + // TODO: Retry plotting on error instead of error out completely maybe_sector_plotting_result?, &mut metadata_header, §or_plotting_options.metadata_file diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index c649dc705a..181f74a19d 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -25,6 +25,7 @@ use tracing::{debug, warn}; const MAX_DEFAULT_FARMING_THREADS: usize = 32; /// Joins async join handle on drop +#[derive(Debug)] pub struct AsyncJoinOnDrop { handle: Option>, abort_on_drop: bool, diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index deaa0529ac..210189458a 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -212,7 +212,10 @@ where let inner = &self.inner; trace!(%piece_index, "Getting piece from local plot"); - let maybe_read_piece_fut = inner.plotted_pieces.read().await.read_piece(piece_index); + let maybe_read_piece_fut = inner + .plotted_pieces + .try_read() + .and_then(|plotted_pieces| plotted_pieces.read_piece(piece_index)); if let Some(read_piece_fut) = maybe_read_piece_fut { if let Some(piece) = read_piece_fut.await {