Skip to content

Commit

Permalink
Merge pull request #2743 from subspace/plotting-metadata-refactoring
Browse files Browse the repository at this point in the history
Plotting metadata refactoring
  • Loading branch information
nazar-pc authored May 7, 2024
2 parents d0aaf10 + 5200a17 commit f8bf280
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 71 deletions.
2 changes: 0 additions & 2 deletions crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ pub fn create_signed_vote(

for sector_index in iter::from_fn(|| Some(rand::random())) {
let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key: &public_key,
Expand All @@ -427,7 +426,6 @@ pub fn create_signed_vote(
erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: None,
encoding_semaphore: None,
table_generators: slice::from_mut(&mut table_generator),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
println!("Plotting one sector...");

let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key,
Expand All @@ -125,7 +124,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
erasure_coding: &erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: slice::from_mut(&mut table_generator),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ fn criterion_benchmark(c: &mut Criterion) {

let sector_size = sector_size(pieces_in_sector);
let mut sector_bytes = Vec::new();
let mut sector_metadata_bytes = Vec::new();

let mut group = c.benchmark_group("plotting");
group.throughput(Throughput::Bytes(sector_size as u64));
Expand All @@ -84,7 +83,6 @@ fn criterion_benchmark(c: &mut Criterion) {
erasure_coding: black_box(&erasure_coding),
pieces_in_sector: black_box(pieces_in_sector),
sector_output: black_box(&mut sector_bytes),
sector_metadata_output: black_box(&mut sector_metadata_bytes),
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: black_box(&mut table_generators),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
println!("Plotting one sector...");

let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key,
Expand All @@ -133,7 +132,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: slice::from_mut(&mut table_generator),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
println!("Plotting one sector...");

let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key: &public_key,
Expand All @@ -124,7 +123,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
erasure_coding: &erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: slice::from_mut(&mut table_generator),
Expand Down
29 changes: 0 additions & 29 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ pub enum PlottingError {
/// Expected size
expected: usize,
},
/// Bad sector metadata output size
#[error("Bad sector metadata output size: provided {provided}, expected {expected}")]
BadSectorMetadataOutputSize {
/// Actual size
provided: usize,
/// Expected size
expected: usize,
},
/// Piece not found, can't create sector, this should never happen
#[error("Piece {piece_index} not found, can't create sector, this should never happen")]
PieceNotFound {
Expand Down Expand Up @@ -136,9 +128,6 @@ where
/// Where plotted sector should be written, vector must either be empty (in which case it'll be
/// resized to correct size automatically) or correctly sized from the beginning
pub sector_output: &'a mut Vec<u8>,
/// Where plotted sector metadata should be written, vector must either be empty (in which case
/// it'll be resized to correct size automatically) or correctly sized from the beginning
pub sector_metadata_output: &'a mut Vec<u8>,
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub downloading_semaphore: Option<Arc<Semaphore>>,
Expand Down Expand Up @@ -173,7 +162,6 @@ where
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
downloading_semaphore,
encoding_semaphore,
table_generators,
Expand Down Expand Up @@ -206,7 +194,6 @@ where
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
table_generators,
abort_early,
global_mutex: &Default::default(),
Expand Down Expand Up @@ -339,9 +326,6 @@ where
/// Where plotted sector should be written, vector must either be empty (in which case it'll be
/// resized to correct size automatically) or correctly sized from the beginning
pub sector_output: &'a mut Vec<u8>,
/// Where plotted sector metadata should be written, vector must either be empty (in which case
/// it'll be resized to correct size automatically) or correctly sized from the beginning
pub sector_metadata_output: &'a mut Vec<u8>,
/// Proof of space table generators
pub table_generators: &'a mut [PosTable::Generator],
/// Whether encoding should be aborted early
Expand Down Expand Up @@ -370,7 +354,6 @@ where
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
table_generators,
abort_early,
global_mutex,
Expand All @@ -393,15 +376,6 @@ where
});
}

if !sector_metadata_output.is_empty()
&& sector_metadata_output.len() != SectorMetadataChecksummed::encoded_size()
{
return Err(PlottingError::BadSectorMetadataOutputSize {
provided: sector_metadata_output.len(),
expected: SectorMetadataChecksummed::encoded_size(),
});
}

let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
{
let iter = Mutex::new(
Expand Down Expand Up @@ -454,7 +428,6 @@ where
}

sector_output.resize(sector_size, 0);
sector_metadata_output.resize(SectorMetadataChecksummed::encoded_size(), 0);

// Write sector to disk in form of following regions:
// * sector contents map
Expand Down Expand Up @@ -526,8 +499,6 @@ where
history_size: farmer_protocol_info.history_size,
});

sector_metadata_output.copy_from_slice(&sector_metadata.encode());

Ok(PlottedSector {
sector_id,
sector_index,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const_option,
duration_constructors,
exact_size_is_empty,
fmt_helpers_for_derive,
hash_extract_if,
impl_trait_in_assoc_type,
int_roundings,
Expand Down
46 changes: 35 additions & 11 deletions crates/subspace-farmer/src/plotter.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
pub mod cpu;

use async_trait::async_trait;
use futures::Sink;
use parity_scale_codec::{Decode, Encode};
use futures::{Sink, Stream};
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{PublicKey, SectorIndex};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::FarmerProtocolInfo;

// TODO: It is a bit awkward that this mimics `SectorPlottingDetails` with slight differences, maybe
// `SectorPlottingDetails` should be a bit generic and support customization of
// `Starting`/`Finished` contents
/// Sector plotting progress
#[derive(Debug, Clone, Encode, Decode)]
#[allow(clippy::large_enum_variant)]
pub enum SectorPlottingProgress {
/// Downloading sector pieces
Downloading,
Expand All @@ -31,10 +27,8 @@ pub enum SectorPlottingProgress {
plotted_sector: PlottedSector,
/// How much time it took to plot a sector
time: Duration,
/// All plotted sector bytes
sector: Vec<u8>,
/// All plotted sector metadata bytes
sector_metadata: Vec<u8>,
/// Stream of all plotted sector bytes
sector: Pin<Box<dyn Stream<Item = Result<Vec<u8>, String>> + Send + Sync>>,
},
/// Plotting failed
Error {
Expand All @@ -43,6 +37,36 @@ pub enum SectorPlottingProgress {
},
}

impl fmt::Debug for SectorPlottingProgress {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SectorPlottingProgress::Downloading => fmt::Formatter::write_str(f, "Downloading"),
SectorPlottingProgress::Downloaded(time) => {
f.debug_tuple_field1_finish("Downloaded", &time)
}
SectorPlottingProgress::Encoding => fmt::Formatter::write_str(f, "Encoding"),
SectorPlottingProgress::Encoded(time) => f.debug_tuple_field1_finish("Encoded", &time),
SectorPlottingProgress::Finished {
plotted_sector,
time,
sector: _,
} => f.debug_struct_field3_finish(
"Finished",
"plotted_sector",
plotted_sector,
"time",
time,
"sector",
&"<stream>",
),
SectorPlottingProgress::Error { error } => {
f.debug_struct_field1_finish("Error", "error", &error)
}
}
}
}

/// Abstract plotter implementation
#[async_trait]
pub trait Plotter {
Expand Down
11 changes: 4 additions & 7 deletions crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, Sink, SinkExt, StreamExt};
use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt};
use std::error::Error;
use std::future::pending;
use std::marker::PhantomData;
Expand Down Expand Up @@ -175,13 +175,12 @@ where
};

// Plotting
let (sector, sector_metadata, plotted_sector) = {
let (sector, plotted_sector) = {
let thread_pools = plotting_thread_pool_manager.get_thread_pools().await;

let plotting_fn = || {
tokio::task::block_in_place(|| {
let mut sector = Vec::new();
let mut sector_metadata = Vec::new();

let plotted_sector = encode_sector::<PosTable>(
downloaded_sector,
Expand All @@ -190,7 +189,6 @@ where
erasure_coding: &erasure_coding,
pieces_in_sector,
sector_output: &mut sector,
sector_metadata_output: &mut sector_metadata,
table_generators: &mut (0..record_encoding_concurrency.get())
.map(|_| PosTable::generator())
.collect::<Vec<_>>(),
Expand All @@ -199,7 +197,7 @@ where
},
)?;

Ok((sector, sector_metadata, plotted_sector))
Ok((sector, plotted_sector))
})
};

Expand Down Expand Up @@ -264,8 +262,7 @@ where
SectorPlottingProgress::Finished {
plotted_sector,
time: start.elapsed(),
sector,
sector_metadata,
sector: Box::pin(stream::once(async move { Ok(sector) })),
},
)
.await;
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ impl SingleDiskFarm {
let public_key = *single_disk_farm_info.public_key();
let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
let sector_size = sector_size(pieces_in_sector);
let sector_metadata_size = SectorMetadataChecksummed::encoded_size();

let SingleDiskFarmOptions {
directory,
Expand Down Expand Up @@ -825,7 +824,6 @@ impl SingleDiskFarm {
node_client: &node_client,
pieces_in_sector,
sector_size,
sector_metadata_size,
plot_file: &plot_file,
metadata_file,
sectors_metadata: &sectors_metadata,
Expand Down
31 changes: 21 additions & 10 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub(super) struct SectorPlottingOptions<'a, NC, P> {
pub(super) node_client: &'a NC,
pub(super) pieces_in_sector: u16,
pub(super) sector_size: usize,
pub(super) sector_metadata_size: usize,
#[cfg(not(windows))]
pub(super) plot_file: &'a File,
#[cfg(windows)]
Expand Down Expand Up @@ -234,7 +233,6 @@ where
node_client,
pieces_in_sector,
sector_size,
sector_metadata_size,
plot_file,
metadata_file,
sectors_metadata,
Expand Down Expand Up @@ -350,9 +348,8 @@ where
plotted_sector,
time: _,
sector,
sector_metadata,
} => {
return Ok((plotted_sector, sector, sector_metadata));
return Ok((plotted_sector, sector));
}
SectorPlottingProgress::Error { error } => {
handlers.sector_update.call_simple(&(
Expand All @@ -367,7 +364,7 @@ where
Err("Plotting progress stream ended before plotting finished".to_string())
};

let (plotted_sector, sector, sector_metadata) = progress_processor_fut
let (plotted_sector, mut sector) = progress_processor_fut
.await
.map_err(PlottingError::LowLevel)?;

Expand All @@ -385,11 +382,25 @@ where

let start = Instant::now();

plot_file.write_all_at(&sector, (sector_index as usize * sector_size) as u64)?;
metadata_file.write_all_at(
&sector_metadata,
RESERVED_PLOT_METADATA + (u64::from(sector_index) * *sector_metadata_size as u64),
)?;
{
let mut sector_write_offset = u64::from(sector_index) * *sector_size as u64;
while let Some(maybe_sector_chunk) = sector.next().await {
let sector_chunk = maybe_sector_chunk.map_err(|error| {
PlottingError::LowLevel(format!("Sector chunk receive error: {error}"))
})?;
plot_file.write_all_at(&sector_chunk, sector_write_offset)?;
sector_write_offset += sector_chunk.len() as u64;
}
drop(sector);
}
{
let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
metadata_file.write_all_at(
&encoded_sector_metadata,
RESERVED_PLOT_METADATA
+ (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
)?;
}

handlers.sector_update.call_simple(&(
sector_index,
Expand Down
Loading

0 comments on commit f8bf280

Please sign in to comment.