Skip to content

Commit

Permalink
Add internal benchmark in SingleDiskFarm to identify faster `ReadSe…
Browse files Browse the repository at this point in the history
…ctorRecordChunksMode`
  • Loading branch information
nazar-pc committed Mar 9, 2024
1 parent a872f4f commit 2acd11a
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ where

let (single_disk_farms, plotting_delay_senders) = tokio::task::block_in_place(|| {
let handle = Handle::current();
let faster_read_sector_record_chunks_mode_concurrency = &Semaphore::new(1);
let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len())
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, Vec<_>>();
Expand Down Expand Up @@ -664,6 +665,7 @@ where
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
plotting_delay: Some(plotting_delay_receiver),
disable_farm_locking,
faster_read_sector_record_chunks_mode_concurrency,
},
disk_farm_index,
);
Expand Down
78 changes: 66 additions & 12 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use rand::prelude::*;
use rayon::prelude::*;
use rayon::ThreadPoolBuilder;
use serde::{Deserialize, Serialize};
Expand All @@ -48,10 +49,10 @@ use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};
use std::{fs, io, mem};
use subspace_core_primitives::crypto::blake3_hash;
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::crypto::{blake3_hash, Scalar};
use subspace_core_primitives::{
Blake3Hash, HistorySize, Piece, PieceOffset, PublicKey, Record, SectorId, SectorIndex,
SegmentIndex,
Expand Down Expand Up @@ -261,7 +262,7 @@ impl PlotMetadataHeader {
}

/// Options used to open single disk farm
pub struct SingleDiskFarmOptions<NC, PG> {
pub struct SingleDiskFarmOptions<'a, NC, PG> {
/// Path to directory where farm is stored.
pub directory: PathBuf,
/// Information necessary for farmer application
Expand Down Expand Up @@ -299,6 +300,8 @@ pub struct SingleDiskFarmOptions<NC, PG> {
pub plotting_delay: Option<oneshot::Receiver<()>>,
/// Disable farm locking, for example if file system doesn't support it
pub disable_farm_locking: bool,
/// Limit concurrency of internal benchmarking between different farms
pub faster_read_sector_record_chunks_mode_concurrency: &'a Semaphore,
}

/// Errors happening when trying to create/open single disk farm
Expand Down Expand Up @@ -602,7 +605,7 @@ impl SingleDiskFarm {
///
/// NOTE: Though this function is async, it will do some blocking I/O.
pub async fn new<NC, PG, PosTable>(
options: SingleDiskFarmOptions<NC, PG>,
options: SingleDiskFarmOptions<'_, NC, PG>,
disk_farm_index: usize,
) -> Result<Self, SingleDiskFarmError>
where
Expand All @@ -628,6 +631,7 @@ impl SingleDiskFarm {
plotting_delay,
farm_during_initial_plotting,
disable_farm_locking,
faster_read_sector_record_chunks_mode_concurrency,
} = options;
fs::create_dir_all(&directory)?;

Expand Down Expand Up @@ -928,10 +932,13 @@ impl SingleDiskFarm {
sector_size,
);

let read_sector_record_chunks_mode = faster_read_sector_record_chunks_mode(
&*plot_file,
sector_size,
)?;
let read_sector_record_chunks_mode = {
// Error doesn't matter here
let _permit = faster_read_sector_record_chunks_mode_concurrency
.acquire()
.await;
faster_read_sector_record_chunks_mode(&*plot_file, sector_size)?
};

let (error_sender, error_receiver) = oneshot::channel();
let error_sender = Arc::new(Mutex::new(Some(error_sender)));
Expand Down Expand Up @@ -2063,12 +2070,59 @@ fn write_dummy_sector_metadata(
}

fn faster_read_sector_record_chunks_mode<P>(
_plot: &P,
_sector_size: usize,
plot: &P,
sector_size: usize,
) -> Result<ReadSectorRecordChunksMode, SingleDiskFarmError>
where
P: FileExt,
P: FileExt + Sync,
{
info!("Benchmarking faster proving method");

let mut sector_bytes = vec![0u8; sector_size];

plot.read_exact_at(&mut sector_bytes, 0)?;

if sector_bytes.iter().all(|byte| *byte == 0) {
thread_rng().fill_bytes(&mut sector_bytes);
plot.write_all_at(&sector_bytes, 0)?;
}

let mut fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks;
let mut fastest_time = Duration::MAX;

for _ in 0..3 {
// A lot simplified version of concurrent chunks
{
let start = Instant::now();
(0..Record::NUM_S_BUCKETS)
.into_par_iter()
.try_for_each(|_| {
let offset = thread_rng().gen_range(0_usize..sector_size / Scalar::FULL_BYTES)
* Scalar::FULL_BYTES;
plot.read_exact_at(&mut [0; Scalar::FULL_BYTES], offset as u64)
})?;
let elapsed = start.elapsed();

if fastest_time > elapsed {
fastest_mode = ReadSectorRecordChunksMode::ConcurrentChunks;
fastest_time = elapsed;
}
}
// Reading the whole sector at once
{
let start = Instant::now();
plot.read_exact_at(&mut sector_bytes, 0)?;
let elapsed = start.elapsed();

if fastest_time > elapsed {
fastest_mode = ReadSectorRecordChunksMode::WholeSector;
fastest_time = elapsed;
}
}
}

debug!(?fastest_mode, "Faster proving method found");

// TODO: Do quick benchmark and select correct value dynamically
Ok(ReadSectorRecordChunksMode::ConcurrentChunks)
Ok(fastest_mode)
}

0 comments on commit 2acd11a

Please sign in to comment.