Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plotting metadata refactoring #2743

Merged
merged 2 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,6 @@ pub fn create_signed_vote(

for sector_index in iter::from_fn(|| Some(rand::random())) {
let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key: &public_key,
Expand All @@ -427,7 +426,6 @@ pub fn create_signed_vote(
erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: None,
encoding_semaphore: None,
table_generators: slice::from_mut(&mut table_generator),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/auditing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
println!("Plotting one sector...");

let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key,
Expand All @@ -125,7 +124,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
erasure_coding: &erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: slice::from_mut(&mut table_generator),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ fn criterion_benchmark(c: &mut Criterion) {

let sector_size = sector_size(pieces_in_sector);
let mut sector_bytes = Vec::new();
let mut sector_metadata_bytes = Vec::new();

let mut group = c.benchmark_group("plotting");
group.throughput(Throughput::Bytes(sector_size as u64));
Expand All @@ -84,7 +83,6 @@ fn criterion_benchmark(c: &mut Criterion) {
erasure_coding: black_box(&erasure_coding),
pieces_in_sector: black_box(pieces_in_sector),
sector_output: black_box(&mut sector_bytes),
sector_metadata_output: black_box(&mut sector_metadata_bytes),
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: black_box(&mut table_generators),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
println!("Plotting one sector...");

let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key,
Expand All @@ -133,7 +132,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: slice::from_mut(&mut table_generator),
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer-components/benches/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
println!("Plotting one sector...");

let mut plotted_sector_bytes = Vec::new();
let mut plotted_sector_metadata_bytes = Vec::new();

let plotted_sector = block_on(plot_sector::<PosTable, _>(PlotSectorOptions {
public_key: &public_key,
Expand All @@ -124,7 +123,6 @@ pub fn criterion_benchmark(c: &mut Criterion) {
erasure_coding: &erasure_coding,
pieces_in_sector,
sector_output: &mut plotted_sector_bytes,
sector_metadata_output: &mut plotted_sector_metadata_bytes,
downloading_semaphore: black_box(None),
encoding_semaphore: black_box(None),
table_generators: slice::from_mut(&mut table_generator),
Expand Down
29 changes: 0 additions & 29 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,6 @@ pub enum PlottingError {
/// Expected size
expected: usize,
},
/// Bad sector metadata output size
#[error("Bad sector metadata output size: provided {provided}, expected {expected}")]
BadSectorMetadataOutputSize {
/// Actual size
provided: usize,
/// Expected size
expected: usize,
},
/// Piece not found, can't create sector, this should never happen
#[error("Piece {piece_index} not found, can't create sector, this should never happen")]
PieceNotFound {
Expand Down Expand Up @@ -136,9 +128,6 @@ where
/// Where plotted sector should be written, vector must either be empty (in which case it'll be
/// resized to correct size automatically) or correctly sized from the beginning
pub sector_output: &'a mut Vec<u8>,
/// Where plotted sector metadata should be written, vector must either be empty (in which case
/// it'll be resized to correct size automatically) or correctly sized from the beginning
pub sector_metadata_output: &'a mut Vec<u8>,
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub downloading_semaphore: Option<Arc<Semaphore>>,
Expand Down Expand Up @@ -173,7 +162,6 @@ where
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
downloading_semaphore,
encoding_semaphore,
table_generators,
Expand Down Expand Up @@ -206,7 +194,6 @@ where
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
table_generators,
abort_early,
global_mutex: &Default::default(),
Expand Down Expand Up @@ -339,9 +326,6 @@ where
/// Where plotted sector should be written, vector must either be empty (in which case it'll be
/// resized to correct size automatically) or correctly sized from the beginning
pub sector_output: &'a mut Vec<u8>,
/// Where plotted sector metadata should be written, vector must either be empty (in which case
/// it'll be resized to correct size automatically) or correctly sized from the beginning
pub sector_metadata_output: &'a mut Vec<u8>,
/// Proof of space table generators
pub table_generators: &'a mut [PosTable::Generator],
/// Whether encoding should be aborted early
Expand Down Expand Up @@ -370,7 +354,6 @@ where
erasure_coding,
pieces_in_sector,
sector_output,
sector_metadata_output,
table_generators,
abort_early,
global_mutex,
Expand All @@ -393,15 +376,6 @@ where
});
}

if !sector_metadata_output.is_empty()
&& sector_metadata_output.len() != SectorMetadataChecksummed::encoded_size()
{
return Err(PlottingError::BadSectorMetadataOutputSize {
provided: sector_metadata_output.len(),
expected: SectorMetadataChecksummed::encoded_size(),
});
}

let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);
{
let iter = Mutex::new(
Expand Down Expand Up @@ -454,7 +428,6 @@ where
}

sector_output.resize(sector_size, 0);
sector_metadata_output.resize(SectorMetadataChecksummed::encoded_size(), 0);

// Write sector to disk in form of following regions:
// * sector contents map
Expand Down Expand Up @@ -526,8 +499,6 @@ where
history_size: farmer_protocol_info.history_size,
});

sector_metadata_output.copy_from_slice(&sector_metadata.encode());

Ok(PlottedSector {
sector_id,
sector_index,
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
const_option,
duration_constructors,
exact_size_is_empty,
fmt_helpers_for_derive,
hash_extract_if,
impl_trait_in_assoc_type,
int_roundings,
Expand Down
46 changes: 35 additions & 11 deletions crates/subspace-farmer/src/plotter.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
pub mod cpu;

use async_trait::async_trait;
use futures::Sink;
use parity_scale_codec::{Decode, Encode};
use futures::{Sink, Stream};
use std::error::Error;
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{PublicKey, SectorIndex};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::FarmerProtocolInfo;

// TODO: It is a bit awkward that this mimics `SectorPlottingDetails` with slight differences, maybe
// `SectorPlottingDetails` should be a bit generic and support customization of
// `Starting`/`Finished` contents
/// Sector plotting progress
#[derive(Debug, Clone, Encode, Decode)]
#[allow(clippy::large_enum_variant)]
pub enum SectorPlottingProgress {
/// Downloading sector pieces
Downloading,
Expand All @@ -31,10 +27,8 @@ pub enum SectorPlottingProgress {
plotted_sector: PlottedSector,
/// How much time it took to plot a sector
time: Duration,
/// All plotted sector bytes
sector: Vec<u8>,
/// All plotted sector metadata bytes
sector_metadata: Vec<u8>,
/// Stream of all plotted sector bytes
sector: Pin<Box<dyn Stream<Item = Result<Vec<u8>, String>> + Send + Sync>>,
},
/// Plotting failed
Error {
Expand All @@ -43,6 +37,36 @@ pub enum SectorPlottingProgress {
},
}

impl fmt::Debug for SectorPlottingProgress {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SectorPlottingProgress::Downloading => fmt::Formatter::write_str(f, "Downloading"),
SectorPlottingProgress::Downloaded(time) => {
f.debug_tuple_field1_finish("Downloaded", &time)
}
SectorPlottingProgress::Encoding => fmt::Formatter::write_str(f, "Encoding"),
SectorPlottingProgress::Encoded(time) => f.debug_tuple_field1_finish("Encoded", &time),
SectorPlottingProgress::Finished {
plotted_sector,
time,
sector: _,
} => f.debug_struct_field3_finish(
"Finished",
"plotted_sector",
plotted_sector,
"time",
time,
"sector",
&"<stream>",
),
SectorPlottingProgress::Error { error } => {
f.debug_struct_field1_finish("Error", "error", &error)
}
}
}
}

/// Abstract plotter implementation
#[async_trait]
pub trait Plotter {
Expand Down
11 changes: 4 additions & 7 deletions crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use async_trait::async_trait;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, Sink, SinkExt, StreamExt};
use futures::{select, stream, FutureExt, Sink, SinkExt, StreamExt};
use std::error::Error;
use std::future::pending;
use std::marker::PhantomData;
Expand Down Expand Up @@ -175,13 +175,12 @@ where
};

// Plotting
let (sector, sector_metadata, plotted_sector) = {
let (sector, plotted_sector) = {
let thread_pools = plotting_thread_pool_manager.get_thread_pools().await;

let plotting_fn = || {
tokio::task::block_in_place(|| {
let mut sector = Vec::new();
let mut sector_metadata = Vec::new();

let plotted_sector = encode_sector::<PosTable>(
downloaded_sector,
Expand All @@ -190,7 +189,6 @@ where
erasure_coding: &erasure_coding,
pieces_in_sector,
sector_output: &mut sector,
sector_metadata_output: &mut sector_metadata,
table_generators: &mut (0..record_encoding_concurrency.get())
.map(|_| PosTable::generator())
.collect::<Vec<_>>(),
Expand All @@ -199,7 +197,7 @@ where
},
)?;

Ok((sector, sector_metadata, plotted_sector))
Ok((sector, plotted_sector))
})
};

Expand Down Expand Up @@ -264,8 +262,7 @@ where
SectorPlottingProgress::Finished {
plotted_sector,
time: start.elapsed(),
sector,
sector_metadata,
sector: Box::pin(stream::once(async move { Ok(sector) })),
},
)
.await;
Expand Down
2 changes: 0 additions & 2 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,6 @@ impl SingleDiskFarm {
let public_key = *single_disk_farm_info.public_key();
let pieces_in_sector = single_disk_farm_info.pieces_in_sector();
let sector_size = sector_size(pieces_in_sector);
let sector_metadata_size = SectorMetadataChecksummed::encoded_size();

let SingleDiskFarmOptions {
directory,
Expand Down Expand Up @@ -825,7 +824,6 @@ impl SingleDiskFarm {
node_client: &node_client,
pieces_in_sector,
sector_size,
sector_metadata_size,
plot_file: &plot_file,
metadata_file,
sectors_metadata: &sectors_metadata,
Expand Down
31 changes: 21 additions & 10 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ pub(super) struct SectorPlottingOptions<'a, NC, P> {
pub(super) node_client: &'a NC,
pub(super) pieces_in_sector: u16,
pub(super) sector_size: usize,
pub(super) sector_metadata_size: usize,
#[cfg(not(windows))]
pub(super) plot_file: &'a File,
#[cfg(windows)]
Expand Down Expand Up @@ -234,7 +233,6 @@ where
node_client,
pieces_in_sector,
sector_size,
sector_metadata_size,
plot_file,
metadata_file,
sectors_metadata,
Expand Down Expand Up @@ -350,9 +348,8 @@ where
plotted_sector,
time: _,
sector,
sector_metadata,
} => {
return Ok((plotted_sector, sector, sector_metadata));
return Ok((plotted_sector, sector));
}
SectorPlottingProgress::Error { error } => {
handlers.sector_update.call_simple(&(
Expand All @@ -367,7 +364,7 @@ where
Err("Plotting progress stream ended before plotting finished".to_string())
};

let (plotted_sector, sector, sector_metadata) = progress_processor_fut
let (plotted_sector, mut sector) = progress_processor_fut
.await
.map_err(PlottingError::LowLevel)?;

Expand All @@ -385,11 +382,25 @@ where

let start = Instant::now();

plot_file.write_all_at(&sector, (sector_index as usize * sector_size) as u64)?;
metadata_file.write_all_at(
&sector_metadata,
RESERVED_PLOT_METADATA + (u64::from(sector_index) * *sector_metadata_size as u64),
)?;
{
let mut sector_write_offset = u64::from(sector_index) * *sector_size as u64;
while let Some(maybe_sector_chunk) = sector.next().await {
let sector_chunk = maybe_sector_chunk.map_err(|error| {
PlottingError::LowLevel(format!("Sector chunk receive error: {error}"))
})?;
plot_file.write_all_at(&sector_chunk, sector_write_offset)?;
sector_write_offset += sector_chunk.len() as u64;
}
drop(sector);
}
{
let encoded_sector_metadata = plotted_sector.sector_metadata.encode();
metadata_file.write_all_at(
&encoded_sector_metadata,
RESERVED_PLOT_METADATA
+ (u64::from(sector_index) * encoded_sector_metadata.len() as u64),
)?;
}

handlers.sector_update.call_simple(&(
sector_index,
Expand Down
Loading
Loading