From 8d10c456730134e577ee9b01846dbb15f11dfd8b Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 6 May 2024 14:03:40 +0300 Subject: [PATCH 1/2] Remove `PlotSectorOptions.sector_metadata_output` field as redundant --- crates/pallet-subspace/src/mock.rs | 2 -- .../benches/auditing.rs | 2 -- .../benches/plotting.rs | 2 -- .../benches/proving.rs | 2 -- .../benches/reading.rs | 2 -- .../src/plotting.rs | 29 ------------------- crates/subspace-farmer/src/plotter.rs | 2 -- crates/subspace-farmer/src/plotter/cpu.rs | 7 ++--- .../subspace-farmer/src/single_disk_farm.rs | 3 -- .../src/single_disk_farm/plotting.rs | 24 +++++++-------- test/subspace-test-client/src/lib.rs | 2 -- 11 files changed, 14 insertions(+), 63 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index 29f97b6cfe..8d5c119c5c 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -416,7 +416,6 @@ pub fn create_signed_vote( for sector_index in iter::from_fn(|| Some(rand::random())) { let mut plotted_sector_bytes = Vec::new(); - let mut plotted_sector_metadata_bytes = Vec::new(); let plotted_sector = block_on(plot_sector::(PlotSectorOptions { public_key: &public_key, @@ -427,7 +426,6 @@ pub fn create_signed_vote( erasure_coding, pieces_in_sector, sector_output: &mut plotted_sector_bytes, - sector_metadata_output: &mut plotted_sector_metadata_bytes, downloading_semaphore: None, encoding_semaphore: None, table_generators: slice::from_mut(&mut table_generator), diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index 0ec8aa1fab..095c3b79dc 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -114,7 +114,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { println!("Plotting one sector..."); let mut plotted_sector_bytes = Vec::new(); - let mut plotted_sector_metadata_bytes = Vec::new(); let plotted_sector = block_on(plot_sector::(PlotSectorOptions { public_key, @@ -125,7 +124,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { erasure_coding: &erasure_coding, pieces_in_sector, sector_output: &mut plotted_sector_bytes, - sector_metadata_output: &mut plotted_sector_metadata_bytes, downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), table_generators: slice::from_mut(&mut table_generator), diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index a48e3c2f08..fd602b0ad8 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -69,7 +69,6 @@ fn criterion_benchmark(c: &mut Criterion) { let sector_size = sector_size(pieces_in_sector); let mut sector_bytes = Vec::new(); - let mut sector_metadata_bytes = Vec::new(); let mut group = c.benchmark_group("plotting"); group.throughput(Throughput::Bytes(sector_size as u64)); @@ -84,7 +83,6 @@ fn criterion_benchmark(c: &mut Criterion) { erasure_coding: black_box(&erasure_coding), pieces_in_sector: black_box(pieces_in_sector), sector_output: black_box(&mut sector_bytes), - sector_metadata_output: black_box(&mut sector_metadata_bytes), downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), table_generators: black_box(&mut table_generators), diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index 718846df96..b3edcd8f20 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -122,7 +122,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { println!("Plotting one sector..."); let mut plotted_sector_bytes = Vec::new(); - let mut plotted_sector_metadata_bytes = Vec::new(); let plotted_sector = block_on(plot_sector::(PlotSectorOptions { public_key, @@ -133,7 +132,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { erasure_coding, pieces_in_sector, sector_output: &mut plotted_sector_bytes, - sector_metadata_output: &mut plotted_sector_metadata_bytes, downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), table_generators: slice::from_mut(&mut table_generator), diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 40538d5e2f..c7199e7a3d 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -113,7 +113,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { println!("Plotting one sector..."); let mut plotted_sector_bytes = Vec::new(); - let mut plotted_sector_metadata_bytes = Vec::new(); let plotted_sector = block_on(plot_sector::(PlotSectorOptions { public_key: &public_key, @@ -124,7 +123,6 @@ pub fn criterion_benchmark(c: &mut Criterion) { erasure_coding: &erasure_coding, pieces_in_sector, sector_output: &mut plotted_sector_bytes, - sector_metadata_output: &mut plotted_sector_metadata_bytes, downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), table_generators: slice::from_mut(&mut table_generator), diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 9aa588b7d6..13c25ff7dc 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -70,14 +70,6 @@ pub enum PlottingError { /// Expected size expected: usize, }, - /// Bad sector metadata output size - #[error("Bad sector metadata output size: provided {provided}, expected {expected}")] - BadSectorMetadataOutputSize { - /// Actual size - provided: usize, - /// Expected size - expected: usize, - }, /// Piece not found, can't create sector, this should never happen #[error("Piece {piece_index} not found, can't create sector, this should never happen")] PieceNotFound { @@ -136,9 +128,6 @@ where /// Where plotted sector should be written, vector must either be empty (in which case it'll be /// resized to correct size automatically) or correctly sized from the beginning pub sector_output: &'a mut Vec, - /// Where plotted sector metadata should be written, vector must either be empty (in which case - /// it'll be resized to correct size automatically) or correctly sized from the beginning - pub sector_metadata_output: &'a mut Vec, /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process pub downloading_semaphore: Option>, @@ -173,7 +162,6 @@ where erasure_coding, pieces_in_sector, sector_output, - sector_metadata_output, downloading_semaphore, encoding_semaphore, table_generators, @@ -206,7 +194,6 @@ where erasure_coding, pieces_in_sector, sector_output, - sector_metadata_output, table_generators, abort_early, global_mutex: &Default::default(), @@ -339,9 +326,6 @@ where /// Where plotted sector should be written, vector must either be empty (in which case it'll be /// resized to correct size automatically) or correctly sized from the beginning pub sector_output: &'a mut Vec, - /// Where plotted sector metadata should be written, vector must either be empty (in which case - /// it'll be resized to correct size automatically) or correctly sized from the beginning - pub sector_metadata_output: &'a mut Vec, /// Proof of space table generators pub table_generators: &'a mut [PosTable::Generator], /// Whether encoding should be aborted early @@ -370,7 +354,6 @@ where erasure_coding, pieces_in_sector, sector_output, - sector_metadata_output, table_generators, abort_early, global_mutex, @@ -393,15 +376,6 @@ where }); } - if !sector_metadata_output.is_empty() - && sector_metadata_output.len() != SectorMetadataChecksummed::encoded_size() - { - return Err(PlottingError::BadSectorMetadataOutputSize { - provided: sector_metadata_output.len(), - expected: SectorMetadataChecksummed::encoded_size(), - }); - } - let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); { let iter = Mutex::new( @@ -454,7 +428,6 @@ where } sector_output.resize(sector_size, 0); - sector_metadata_output.resize(SectorMetadataChecksummed::encoded_size(), 0); // Write sector to disk in form of following regions: // * sector contents map @@ -526,8 +499,6 @@ where history_size: farmer_protocol_info.history_size, }); - sector_metadata_output.copy_from_slice(§or_metadata.encode()); - Ok(PlottedSector { sector_id, sector_index, diff --git a/crates/subspace-farmer/src/plotter.rs b/crates/subspace-farmer/src/plotter.rs index 1984ea690b..4728b0b282 100644 --- a/crates/subspace-farmer/src/plotter.rs +++ b/crates/subspace-farmer/src/plotter.rs @@ -33,8 +33,6 @@ pub enum SectorPlottingProgress { time: Duration, /// All plotted sector bytes sector: Vec, - /// All plotted sector metadata bytes - sector_metadata: Vec, }, /// Plotting failed Error { diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index c9689127ae..75f2d92992 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -175,13 +175,12 @@ where }; // Plotting - let (sector, sector_metadata, plotted_sector) = { + let (sector, plotted_sector) = { let thread_pools = plotting_thread_pool_manager.get_thread_pools().await; let plotting_fn = || { tokio::task::block_in_place(|| { let mut sector = Vec::new(); - let mut sector_metadata = Vec::new(); let plotted_sector = encode_sector::( downloaded_sector, @@ -190,7 +189,6 @@ where erasure_coding: &erasure_coding, pieces_in_sector, sector_output: &mut sector, - sector_metadata_output: &mut sector_metadata, table_generators: &mut (0..record_encoding_concurrency.get()) .map(|_| PosTable::generator()) .collect::>(), @@ -199,7 +197,7 @@ where }, )?; - Ok((sector, sector_metadata, plotted_sector)) + Ok((sector, plotted_sector)) }) }; @@ -265,7 +263,6 @@ where plotted_sector, time: start.elapsed(), sector, - sector_metadata, }, ) .await; diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 74830f8c25..350664e094 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -702,7 +702,6 @@ impl SingleDiskFarm { let public_key = *single_disk_farm_info.public_key(); let pieces_in_sector = single_disk_farm_info.pieces_in_sector(); let sector_size = sector_size(pieces_in_sector); - let sector_metadata_size = SectorMetadataChecksummed::encoded_size(); let SingleDiskFarmOptions { directory, @@ -824,8 +823,6 @@ impl SingleDiskFarm { public_key, node_client: &node_client, pieces_in_sector, - sector_size, - sector_metadata_size, plot_file: &plot_file, metadata_file, sectors_metadata: §ors_metadata, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 6564da9d78..9b707b0269 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -88,8 +88,6 @@ pub(super) struct SectorPlottingOptions<'a, NC, P> { pub(super) public_key: PublicKey, pub(super) node_client: &'a NC, pub(super) pieces_in_sector: u16, - pub(super) sector_size: usize, - pub(super) sector_metadata_size: usize, #[cfg(not(windows))] pub(super) plot_file: &'a File, #[cfg(windows)] @@ -233,8 +231,6 @@ where public_key, node_client, pieces_in_sector, - sector_size, - sector_metadata_size, plot_file, metadata_file, sectors_metadata, @@ -350,9 +346,8 @@ where plotted_sector, time: _, sector, - sector_metadata, } => { - return Ok((plotted_sector, sector, sector_metadata)); + return Ok((plotted_sector, sector)); } SectorPlottingProgress::Error { error } => { handlers.sector_update.call_simple(&( @@ -367,7 +362,7 @@ where Err("Plotting progress stream ended before plotting finished".to_string()) }; - let (plotted_sector, sector, sector_metadata) = progress_processor_fut + let (plotted_sector, sector) = progress_processor_fut .await .map_err(PlottingError::LowLevel)?; @@ -385,11 +380,16 @@ where let start = Instant::now(); - plot_file.write_all_at(§or, (sector_index as usize * sector_size) as u64)?; - metadata_file.write_all_at( - §or_metadata, - RESERVED_PLOT_METADATA + (u64::from(sector_index) * *sector_metadata_size as u64), - )?; + plot_file.write_all_at(§or, (sector_index as usize * sector.len()) as u64)?; + drop(sector); + { + let encoded_sector_metadata = plotted_sector.sector_metadata.encode(); + metadata_file.write_all_at( + &encoded_sector_metadata, + RESERVED_PLOT_METADATA + + (u64::from(sector_index) * encoded_sector_metadata.len() as u64), + )?; + } handlers.sector_update.call_simple(&( sector_index, diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 36ce2ec7cd..0059f148f0 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -224,7 +224,6 @@ where .expect("First block is always producing one segment; qed"); let history_size = HistorySize::from(SegmentIndex::ZERO); let mut sector = Vec::new(); - let mut sector_metadata = Vec::new(); let sector_index = 0; let public_key = PublicKey::from(keypair.public.to_bytes()); let farmer_protocol_info = FarmerProtocolInfo { @@ -247,7 +246,6 @@ where erasure_coding, pieces_in_sector, sector_output: &mut sector, - sector_metadata_output: &mut sector_metadata, downloading_semaphore: None, encoding_semaphore: None, table_generators: slice::from_mut(&mut table_generator), From 5200a174cc104302f19ad5ce49c377bdaa775bed Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Mon, 6 May 2024 14:46:06 +0300 Subject: [PATCH 2/2] Refactor plotted sector contents to be a stream --- crates/subspace-farmer/src/lib.rs | 1 + crates/subspace-farmer/src/plotter.rs | 44 +++++++++++++++---- crates/subspace-farmer/src/plotter/cpu.rs | 4 +- .../subspace-farmer/src/single_disk_farm.rs | 1 + .../src/single_disk_farm/plotting.rs | 17 +++++-- 5 files changed, 53 insertions(+), 14 deletions(-) diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index d67247d5f6..5197f43bf7 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -5,6 +5,7 @@ const_option, duration_constructors, exact_size_is_empty, + fmt_helpers_for_derive, hash_extract_if, impl_trait_in_assoc_type, int_roundings, diff --git a/crates/subspace-farmer/src/plotter.rs b/crates/subspace-farmer/src/plotter.rs index 4728b0b282..ae4cf760ab 100644 --- a/crates/subspace-farmer/src/plotter.rs +++ b/crates/subspace-farmer/src/plotter.rs @@ -1,21 +1,17 @@ pub mod cpu; use async_trait::async_trait; -use futures::Sink; -use parity_scale_codec::{Decode, Encode}; +use futures::{Sink, Stream}; use std::error::Error; +use std::fmt; +use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use subspace_core_primitives::{PublicKey, SectorIndex}; use subspace_farmer_components::plotting::PlottedSector; use subspace_farmer_components::FarmerProtocolInfo; -// TODO: It is a bit awkward that this mimics `SectorPlottingDetails` with slight differences, maybe -// `SectorPlottingDetails` should be a bit generic and support customization of -// `Starting`/`Finished` contents /// Sector plotting progress -#[derive(Debug, Clone, Encode, Decode)] -#[allow(clippy::large_enum_variant)] pub enum SectorPlottingProgress { /// Downloading sector pieces Downloading, @@ -31,8 +27,8 @@ pub enum SectorPlottingProgress { plotted_sector: PlottedSector, /// How much time it took to plot a sector time: Duration, - /// All plotted sector bytes - sector: Vec, + /// Stream of all plotted sector bytes + sector: Pin, String>> + Send + Sync>>, }, /// Plotting failed Error { @@ -41,6 +37,36 @@ pub enum SectorPlottingProgress { }, } +impl fmt::Debug for SectorPlottingProgress { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + SectorPlottingProgress::Downloading => fmt::Formatter::write_str(f, "Downloading"), + SectorPlottingProgress::Downloaded(time) => { + f.debug_tuple_field1_finish("Downloaded", &time) + } + SectorPlottingProgress::Encoding => fmt::Formatter::write_str(f, "Encoding"), + SectorPlottingProgress::Encoded(time) => f.debug_tuple_field1_finish("Encoded", &time), + SectorPlottingProgress::Finished { + plotted_sector, + time, + sector: _, + } => f.debug_struct_field3_finish( + "Finished", + "plotted_sector", + plotted_sector, + "time", + time, + "sector", + &"", + ), + SectorPlottingProgress::Error { error } => { + f.debug_struct_field1_finish("Error", "error", &error) + } + } + } +} + /// Abstract plotter implementation #[async_trait] pub trait Plotter { diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index 75f2d92992..1ef8343df5 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; use futures::stream::FuturesUnordered; -use futures::{select, FutureExt, Sink, SinkExt, StreamExt}; +use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt}; use std::error::Error; use std::future::pending; use std::marker::PhantomData; @@ -262,7 +262,7 @@ where SectorPlottingProgress::Finished { plotted_sector, time: start.elapsed(), - sector, + sector: Box::pin(stream::once(async move { Ok(sector) })), }, ) .await; diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 350664e094..ab2cb0aaf1 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -823,6 +823,7 @@ impl SingleDiskFarm { public_key, node_client: &node_client, pieces_in_sector, + sector_size, plot_file: &plot_file, metadata_file, sectors_metadata: §ors_metadata, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 9b707b0269..4eb1cdf3c9 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -88,6 +88,7 @@ pub(super) struct SectorPlottingOptions<'a, NC, P> { pub(super) public_key: PublicKey, pub(super) node_client: &'a NC, pub(super) pieces_in_sector: u16, + pub(super) sector_size: usize, #[cfg(not(windows))] pub(super) plot_file: &'a File, #[cfg(windows)] @@ -231,6 +232,7 @@ where public_key, node_client, pieces_in_sector, + sector_size, plot_file, metadata_file, sectors_metadata, @@ -362,7 +364,7 @@ where Err("Plotting progress stream ended before plotting finished".to_string()) }; - let (plotted_sector, sector) = progress_processor_fut + let (plotted_sector, mut sector) = progress_processor_fut .await .map_err(PlottingError::LowLevel)?; @@ -380,8 +382,17 @@ where let start = Instant::now(); - plot_file.write_all_at(§or, (sector_index as usize * sector.len()) as u64)?; - drop(sector); + { + let mut sector_write_offset = u64::from(sector_index) * *sector_size as u64; + while let Some(maybe_sector_chunk) = sector.next().await { + let sector_chunk = maybe_sector_chunk.map_err(|error| { + PlottingError::LowLevel(format!("Sector chunk receive error: {error}")) + })?; + plot_file.write_all_at(§or_chunk, sector_write_offset)?; + sector_write_offset += sector_chunk.len() as u64; + } + drop(sector); + } { let encoded_sector_metadata = plotted_sector.sector_metadata.encode(); metadata_file.write_all_at(