From 6b73ba7349567bcb27bdc8773370b282ff1c9d22 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Tue, 3 Sep 2024 05:07:52 +0300 Subject: [PATCH] Refactor records encoding --- crates/pallet-subspace/src/mock.rs | 10 +- crates/sp-lightclient/src/tests.rs | 8 +- .../benches/auditing.rs | 12 +- .../benches/plotting.rs | 10 +- .../benches/proving.rs | 12 +- .../benches/reading.rs | 12 +- .../src/plotting.rs | 259 +++++++++++------- crates/subspace-farmer/src/plotter/cpu.rs | 18 +- test/subspace-test-client/src/lib.rs | 12 +- 9 files changed, 228 insertions(+), 125 deletions(-) diff --git a/crates/pallet-subspace/src/mock.rs b/crates/pallet-subspace/src/mock.rs index d69c197ead..9fdc930333 100644 --- a/crates/pallet-subspace/src/mock.rs +++ b/crates/pallet-subspace/src/mock.rs @@ -52,7 +52,7 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector_sync; -use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions}; +use subspace_farmer_components::plotting::{plot_sector, CpuRecordsEncoder, PlotSectorOptions}; use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::shim::ShimTable; @@ -417,7 +417,7 @@ pub fn create_signed_vote( for sector_index in iter::from_fn(|| Some(rand::random())) { let mut plotted_sector_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + let plotted_sector = block_on(plot_sector(PlotSectorOptions { public_key: &public_key, sector_index, piece_getter: archived_history_segment, @@ -428,7 +428,11 @@ pub fn create_signed_vote( sector_output: &mut plotted_sector_bytes, downloading_semaphore: None, encoding_semaphore: None, - table_generators: slice::from_mut(&mut table_generator), + records_encoder: &mut CpuRecordsEncoder::::new( + slice::from_mut(&mut table_generator), + erasure_coding, + &Default::default(), + ), abort_early: &Default::default(), })) .unwrap(); diff --git a/crates/sp-lightclient/src/tests.rs b/crates/sp-lightclient/src/tests.rs index c886bda268..96da40490d 100644 --- a/crates/sp-lightclient/src/tests.rs +++ b/crates/sp-lightclient/src/tests.rs @@ -176,7 +176,7 @@ fn valid_header( let mut plotted_sector_bytes = Vec::new(); let mut plotted_sector_metadata_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::( + let plotted_sector = block_on(plot_sector( &public_key, sector_index, &archived_segment.pieces, @@ -186,7 +186,11 @@ fn valid_header( pieces_in_sector, &mut plotted_sector_bytes, &mut plotted_sector_metadata_bytes, - &mut table_generator, + records_encoder: &mut CpuRecordsEncoder::::new( + slice::from_mut(&mut table_generator), + &erasure_coding, + &Default::default(), + ), )) .unwrap(); diff --git a/crates/subspace-farmer-components/benches/auditing.rs b/crates/subspace-farmer-components/benches/auditing.rs index d5a8e0d45c..682e5cf1a9 100644 --- a/crates/subspace-farmer-components/benches/auditing.rs +++ b/crates/subspace-farmer-components/benches/auditing.rs @@ -15,7 +15,9 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, CpuRecordsEncoder, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, }; @@ -115,7 +117,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + let plotted_sector = block_on(plot_sector(PlotSectorOptions { public_key, sector_index, piece_getter: &archived_history_segment, @@ -126,7 +128,11 @@ pub fn criterion_benchmark(c: &mut Criterion) { sector_output: &mut plotted_sector_bytes, downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), - table_generators: slice::from_mut(&mut table_generator), + records_encoder: &mut CpuRecordsEncoder::::new( + slice::from_mut(&mut table_generator), + &erasure_coding, + &Default::default(), + ), abort_early: &Default::default(), })) .unwrap(); diff --git a/crates/subspace-farmer-components/benches/plotting.rs b/crates/subspace-farmer-components/benches/plotting.rs index 0c7aa6e2e6..8a6e55b235 100644 --- a/crates/subspace-farmer-components/benches/plotting.rs +++ b/crates/subspace-farmer-components/benches/plotting.rs @@ -8,7 +8,7 @@ use subspace_core_primitives::crypto::kzg; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{HistorySize, PublicKey, Record, RecordedHistorySegment}; use subspace_erasure_coding::ErasureCoding; -use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions}; +use subspace_farmer_components::plotting::{plot_sector, CpuRecordsEncoder, PlotSectorOptions}; use subspace_farmer_components::sector::sector_size; use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::chia::ChiaTable; @@ -74,7 +74,7 @@ fn criterion_benchmark(c: &mut Criterion) { group.throughput(Throughput::Bytes(sector_size as u64)); group.bench_function("in-memory", |b| { b.iter(|| { - block_on(plot_sector::(PlotSectorOptions { + block_on(plot_sector(PlotSectorOptions { public_key: black_box(&public_key), sector_index: black_box(sector_index), piece_getter: black_box(&archived_history_segment), @@ -85,7 +85,11 @@ fn criterion_benchmark(c: &mut Criterion) { sector_output: black_box(&mut sector_bytes), downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), - table_generators: black_box(&mut table_generators), + records_encoder: black_box(&mut CpuRecordsEncoder::::new( + &mut table_generators, + &erasure_coding, + &Default::default(), + )), abort_early: &Default::default(), })) .unwrap(); diff --git a/crates/subspace-farmer-components/benches/proving.rs b/crates/subspace-farmer-components/benches/proving.rs index ce351e683e..f2a9517ac5 100644 --- a/crates/subspace-farmer-components/benches/proving.rs +++ b/crates/subspace-farmer-components/benches/proving.rs @@ -20,7 +20,9 @@ use subspace_core_primitives::{ use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_plot_sync; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, CpuRecordsEncoder, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, @@ -123,7 +125,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + let plotted_sector = block_on(plot_sector(PlotSectorOptions { public_key, sector_index, piece_getter: &archived_history_segment, @@ -134,7 +136,11 @@ pub fn criterion_benchmark(c: &mut Criterion) { sector_output: &mut plotted_sector_bytes, downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), - table_generators: slice::from_mut(&mut table_generator), + records_encoder: &mut CpuRecordsEncoder::::new( + slice::from_mut(&mut table_generator), + erasure_coding, + &Default::default(), + ), abort_early: &Default::default(), })) .unwrap(); diff --git a/crates/subspace-farmer-components/benches/reading.rs b/crates/subspace-farmer-components/benches/reading.rs index 96f5536f7a..2f0d8c98fd 100644 --- a/crates/subspace-farmer-components/benches/reading.rs +++ b/crates/subspace-farmer-components/benches/reading.rs @@ -15,7 +15,9 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt}; -use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, CpuRecordsEncoder, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::reading::{read_piece, ReadSectorRecordChunksMode}; use subspace_farmer_components::sector::{ sector_size, SectorContentsMap, SectorMetadata, SectorMetadataChecksummed, @@ -114,7 +116,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { let mut plotted_sector_bytes = Vec::new(); - let plotted_sector = block_on(plot_sector::(PlotSectorOptions { + let plotted_sector = block_on(plot_sector(PlotSectorOptions { public_key: &public_key, sector_index, piece_getter: &archived_history_segment, @@ -125,7 +127,11 @@ pub fn criterion_benchmark(c: &mut Criterion) { sector_output: &mut plotted_sector_bytes, downloading_semaphore: black_box(None), encoding_semaphore: black_box(None), - table_generators: slice::from_mut(&mut table_generator), + records_encoder: &mut CpuRecordsEncoder::::new( + slice::from_mut(&mut table_generator), + &erasure_coding, + &Default::default(), + ), abort_early: &Default::default(), })) .unwrap(); diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index cd56a1a1c8..505e89f24a 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -28,7 +28,8 @@ use std::time::Duration; use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::crypto::{blake3_hash, blake3_hash_parallel, Scalar}; use subspace_core_primitives::{ - Blake3Hash, PieceIndex, PieceOffset, PosSeed, PublicKey, Record, SBucket, SectorId, SectorIndex, + Blake3Hash, HistorySize, PieceIndex, PieceOffset, PosSeed, PublicKey, Record, SBucket, + SectorId, SectorIndex, }; use subspace_erasure_coding::ErasureCoding; use subspace_proof_of_space::{Table, TableGenerator}; @@ -64,12 +65,12 @@ pub struct PlottedSector { /// Plotting status #[derive(Debug, Error)] pub enum PlottingError { - /// Invalid erasure coding instance - #[error("Invalid erasure coding instance")] - InvalidErasureCodingInstance, - /// No table generators - #[error("No table generators")] - NoTableGenerators, + /// Records encoder error + #[error("Records encoder error: {error}")] + RecordsEncoderError { + /// Lower-level error + error: Box, + }, /// Bad sector output size #[error("Bad sector output size: provided {provided}, expected {expected}")] BadSectorOutputSize { @@ -116,10 +117,7 @@ pub enum PlottingError { /// resized to correct size automatically) or correctly sized from the beginning or else error will /// be returned. #[derive(Debug)] -pub struct PlotSectorOptions<'a, PosTable, PG> -where - PosTable: Table, -{ +pub struct PlotSectorOptions<'a, RE, PG> { /// Public key corresponding to sector pub public_key: &'a PublicKey, /// Sector index @@ -144,7 +142,7 @@ where /// allow one permit at a time for efficient CPU utilization pub encoding_semaphore: Option<&'a Semaphore>, /// Proof of space table generators - pub table_generators: &'a mut [PosTable::Generator], + pub records_encoder: &'a mut RE, /// Whether encoding should be aborted early pub abort_early: &'a AtomicBool, } @@ -155,11 +153,11 @@ where /// /// NOTE: Even though this function is async, it has blocking code inside and must be running in a /// separate thread in order to prevent blocking an executor. -pub async fn plot_sector( - options: PlotSectorOptions<'_, PosTable, PG>, +pub async fn plot_sector( + options: PlotSectorOptions<'_, RE, PG>, ) -> Result where - PosTable: Table, + RE: RecordsEncoder, PG: PieceGetter, { let PlotSectorOptions { @@ -173,7 +171,7 @@ where sector_output, downloading_semaphore, encoding_semaphore, - table_generators, + records_encoder, abort_early, } = options; @@ -199,14 +197,11 @@ where encode_sector( download_sector_fut.await?, - EncodeSectorOptions:: { + EncodeSectorOptions:: { sector_index, - erasure_coding, - pieces_in_sector, + records_encoder, sector_output, - table_generators, abort_early, - global_mutex: &Default::default(), }, ) } @@ -217,7 +212,7 @@ pub struct DownloadedSector { sector_id: SectorId, piece_indices: Vec, raw_sector: RawSector, - farmer_protocol_info: FarmerProtocolInfo, + history_size: HistorySize, } /// Options for sector downloading @@ -320,73 +315,181 @@ where sector_id, piece_indices, raw_sector: raw_sector.into_inner(), - farmer_protocol_info, + history_size: farmer_protocol_info.history_size, }) } +/// Records encoder for plotting purposes +pub trait RecordsEncoder { + /// Encode provided sector records + fn encode_records( + &mut self, + sector_id: &SectorId, + records: &mut [Record], + history_size: HistorySize, + abort_early: &AtomicBool, + ) -> Result>; +} + +/// CPU implementation of [`RecordsEncoder`] +#[derive(Debug)] +pub struct CpuRecordsEncoder<'a, PosTable> +where + PosTable: Table, +{ + table_generators: &'a mut [PosTable::Generator], + erasure_coding: &'a ErasureCoding, + global_mutex: &'a AsyncMutex<()>, +} + +impl RecordsEncoder for CpuRecordsEncoder<'_, PosTable> +where + PosTable: Table, +{ + fn encode_records( + &mut self, + sector_id: &SectorId, + records: &mut [Record], + history_size: HistorySize, + abort_early: &AtomicBool, + ) -> Result> { + if self.erasure_coding.max_shards() < Record::NUM_S_BUCKETS { + return Err(format!( + "Invalid erasure coding instance: {} shards needed, {} supported", + Record::NUM_S_BUCKETS, + self.erasure_coding.max_shards() + ) + .into()); + } + + if self.table_generators.is_empty() { + return Err("No table generators".into()); + } + + let pieces_in_sector = records + .len() + .try_into() + .map_err(|error| format!("Failed to convert pieces in sector: {error}"))?; + let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); + + { + let table_generators = &mut *self.table_generators; + let global_mutex = self.global_mutex; + let erasure_coding = self.erasure_coding; + + let iter = Mutex::new( + (PieceOffset::ZERO..) + .zip(records.iter_mut()) + .zip(sector_contents_map.iter_record_bitfields_mut()), + ); + + rayon::scope(|scope| { + for table_generator in table_generators { + scope.spawn(|_scope| { + let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS); + + loop { + // Take mutex briefly to make sure encoding is allowed right now + global_mutex.lock_blocking(); + + // This instead of `while` above because otherwise mutex will be held for + // the duration of the loop and will limit concurrency to 1 table generator + let Some(((piece_offset, record), encoded_chunks_used)) = + iter.lock().next() + else { + return; + }; + let pos_seed = + sector_id.derive_evaluation_seed(piece_offset, history_size); + + record_encoding::( + &pos_seed, + record, + encoded_chunks_used, + table_generator, + erasure_coding, + &mut chunks_scratch, + ); + + if abort_early.load(Ordering::Relaxed) { + return; + } + } + }); + } + }); + } + + Ok(sector_contents_map) + } +} + +impl<'a, PosTable> CpuRecordsEncoder<'a, PosTable> +where + PosTable: Table, +{ + /// Create new instance + pub fn new( + table_generators: &'a mut [PosTable::Generator], + erasure_coding: &'a ErasureCoding, + global_mutex: &'a AsyncMutex<()>, + ) -> Self { + Self { + table_generators, + erasure_coding, + global_mutex, + } + } +} + /// Options for encoding a sector. /// /// Sector output and sector metadata output should be either empty (in which case they'll be /// resized to correct size automatically) or correctly sized from the beginning or else error will /// be returned. #[derive(Debug)] -pub struct EncodeSectorOptions<'a, PosTable> +pub struct EncodeSectorOptions<'a, RE> where - PosTable: Table, + RE: RecordsEncoder, { /// Sector index pub sector_index: SectorIndex, - /// Erasure coding instance - pub erasure_coding: &'a ErasureCoding, - /// How many pieces should sector contain - pub pieces_in_sector: u16, + /// Records encoding instance + pub records_encoder: &'a mut RE, /// Where plotted sector should be written, vector must either be empty (in which case it'll be /// resized to correct size automatically) or correctly sized from the beginning pub sector_output: &'a mut Vec, - /// Proof of space table generators - pub table_generators: &'a mut [PosTable::Generator], /// Whether encoding should be aborted early pub abort_early: &'a AtomicBool, - /// Global mutex that can restrict concurrency of resource-intensive operations and make sure - /// that those operations that are very sensitive (like proving) have all the resources - /// available to them for the highest probability of success - pub global_mutex: &'a AsyncMutex<()>, } /// Encode downloaded sector. /// /// This function encodes downloaded sector pieces into provided sector output and returns sector /// metadata. -pub fn encode_sector( +pub fn encode_sector( downloaded_sector: DownloadedSector, - encoding_options: EncodeSectorOptions<'_, PosTable>, + encoding_options: EncodeSectorOptions<'_, RE>, ) -> Result where - PosTable: Table, + RE: RecordsEncoder, { let DownloadedSector { sector_id, piece_indices, mut raw_sector, - farmer_protocol_info, + history_size, } = downloaded_sector; let EncodeSectorOptions { sector_index, - erasure_coding, - pieces_in_sector, + records_encoder, sector_output, - table_generators, abort_early, - global_mutex, } = encoding_options; - if erasure_coding.max_shards() < Record::NUM_S_BUCKETS { - return Err(PlottingError::InvalidErasureCodingInstance); - } - - if table_generators.is_empty() { - return Err(PlottingError::NoTableGenerators); - } + let pieces_in_sector = raw_sector.records.len().try_into().expect( + "Raw sector can only be created in this crate and it is always done correctly; qed", + ); let sector_size = sector_size(pieces_in_sector); @@ -397,52 +500,14 @@ where }); } - let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); - { - let iter = Mutex::new( - (PieceOffset::ZERO..) - .zip(raw_sector.records.iter_mut()) - .zip(sector_contents_map.iter_record_bitfields_mut()), - ); - - rayon::scope(|scope| { - for table_generator in table_generators { - scope.spawn(|_scope| { - let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS); - - loop { - // Take mutex briefly to make sure encoding is allowed right now - global_mutex.lock_blocking(); - - // This instead of `while` above because otherwise mutex will be held for - // the duration of the loop and will limit concurrency to 1 table generator - let Some(((piece_offset, record), encoded_chunks_used)) = - iter.lock().next() - else { - return; - }; - let pos_seed = sector_id.derive_evaluation_seed( - piece_offset, - farmer_protocol_info.history_size, - ); - - record_encoding::( - &pos_seed, - record, - encoded_chunks_used, - table_generator, - erasure_coding, - &mut chunks_scratch, - ); - - if abort_early.load(Ordering::Relaxed) { - return; - } - } - }); - } - }); - } + let sector_contents_map = records_encoder + .encode_records( + §or_id, + &mut raw_sector.records, + history_size, + abort_early, + ) + .map_err(|error| PlottingError::RecordsEncoderError { error })?; if abort_early.load(Ordering::Acquire) { return Err(PlottingError::AbortEarly); @@ -517,7 +582,7 @@ where sector_index, pieces_in_sector, s_bucket_sizes: sector_contents_map.s_bucket_sizes(), - history_size: farmer_protocol_info.history_size, + history_size, }); Ok(PlottedSector { diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index f289882d54..e4b380b015 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -27,7 +27,8 @@ use subspace_core_primitives::crypto::kzg::Kzg; use subspace_core_primitives::{PublicKey, SectorIndex}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::plotting::{ - download_sector, encode_sector, DownloadSectorOptions, EncodeSectorOptions, PlottingError, + download_sector, encode_sector, CpuRecordsEncoder, DownloadSectorOptions, EncodeSectorOptions, + PlottingError, }; use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter}; use subspace_proof_of_space::Table; @@ -356,18 +357,19 @@ where let plotting_fn = || { let mut sector = Vec::new(); - let plotted_sector = encode_sector::( + let plotted_sector = encode_sector( downloaded_sector, EncodeSectorOptions { sector_index, - erasure_coding: &erasure_coding, - pieces_in_sector, sector_output: &mut sector, - table_generators: &mut (0..record_encoding_concurrency.get()) - .map(|_| PosTable::generator()) - .collect::>(), + records_encoder: &mut CpuRecordsEncoder::::new( + &mut (0..record_encoding_concurrency.get()) + .map(|_| PosTable::generator()) + .collect::>(), + &erasure_coding, + &global_mutex, + ), abort_early: &abort_early, - global_mutex: &global_mutex, }, )?; diff --git a/test/subspace-test-client/src/lib.rs b/test/subspace-test-client/src/lib.rs index 7b4ba3484b..6cbd629603 100644 --- a/test/subspace-test-client/src/lib.rs +++ b/test/subspace-test-client/src/lib.rs @@ -41,7 +41,9 @@ use subspace_core_primitives::{ }; use subspace_erasure_coding::ErasureCoding; use subspace_farmer_components::auditing::audit_sector_sync; -use subspace_farmer_components::plotting::{plot_sector, PlotSectorOptions, PlottedSector}; +use subspace_farmer_components::plotting::{ + plot_sector, CpuRecordsEncoder, PlotSectorOptions, PlottedSector, +}; use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_farmer_components::FarmerProtocolInfo; use subspace_proof_of_space::{Table, TableGenerator}; @@ -238,7 +240,7 @@ where min_sector_lifetime: HistorySize::from(NonZeroU64::new(4).unwrap()), }; - let plotted_sector = plot_sector::(PlotSectorOptions { + let plotted_sector = plot_sector(PlotSectorOptions { public_key: &public_key, sector_index, piece_getter: &archived_segment.pieces, @@ -249,7 +251,11 @@ where sector_output: &mut sector, downloading_semaphore: None, encoding_semaphore: None, - table_generators: slice::from_mut(&mut table_generator), + records_encoder: &mut CpuRecordsEncoder::::new( + slice::from_mut(&mut table_generator), + erasure_coding, + &Default::default(), + ), abort_early: &Default::default(), }) .await