Skip to content

Commit

Permalink
Merge pull request #2877 from subspace/farm-format-with-gpu-support
Browse files Browse the repository at this point in the history
Add new farm format support that is compatible with upcoming GPU support
  • Loading branch information
nazar-pc authored Jun 27, 2024
2 parents 4e3bd72 + 8925c36 commit 231c2b5
Show file tree
Hide file tree
Showing 18 changed files with 751 additions and 221 deletions.
115 changes: 77 additions & 38 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::PosTable;
use crate::{PosTable, PosTableLegacy};
use anyhow::anyhow;
use clap::{Parser, Subcommand};
use criterion::{black_box, BatchSize, Criterion, Throughput};
Expand All @@ -14,7 +14,9 @@ use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::single_disk_farm::farming::rayon_files::RayonFiles;
use subspace_farmer::single_disk_farm::farming::{PlotAudit, PlotAuditOptions};
use subspace_farmer::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use subspace_farmer::single_disk_farm::{SingleDiskFarm, SingleDiskFarmSummary};
use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmInfo, SingleDiskFarmSummary,
};
use subspace_farmer::utils::{recommended_number_of_farming_threads, tokio_rayon_spawn_handler};
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_farmer_components::sector::sector_size;
Expand Down Expand Up @@ -108,30 +110,49 @@ pub(crate) fn benchmark(benchmark_args: BenchmarkArgs) -> anyhow::Result<()> {
}

fn audit(audit_options: AuditOptions) -> anyhow::Result<()> {
let single_disk_farm_info =
match SingleDiskFarm::collect_summary(audit_options.disk_farm.clone()) {
SingleDiskFarmSummary::Found { info, directory: _ } => info,
SingleDiskFarmSummary::NotFound { directory } => {
return Err(anyhow!(
"No single disk farm info found, make sure {} is a valid path to the farm and \
process have permissions to access it",
directory.display()
));
}
SingleDiskFarmSummary::Error { directory, error } => {
return Err(anyhow!(
"Failed to open single disk farm info, make sure {} is a valid path to the farm \
and process have permissions to access it: {error}",
directory.display()
));
}
};

match single_disk_farm_info {
SingleDiskFarmInfo::V0 { .. } => {
audit_inner::<PosTableLegacy>(audit_options, single_disk_farm_info)
}
SingleDiskFarmInfo::V1 { .. } => {
audit_inner::<PosTable>(audit_options, single_disk_farm_info)
}
}
}

fn audit_inner<PosTable>(
audit_options: AuditOptions,
single_disk_farm_info: SingleDiskFarmInfo,
) -> anyhow::Result<()>
where
PosTable: Table,
{
let AuditOptions {
sample_size,
with_single,
farming_thread_pool_size: _,
disk_farm,
filter,
} = audit_options;
let (single_disk_farm_info, disk_farm) = match SingleDiskFarm::collect_summary(disk_farm) {
SingleDiskFarmSummary::Found { info, directory } => (info, directory),
SingleDiskFarmSummary::NotFound { directory } => {
return Err(anyhow!(
"No single disk farm info found, make sure {} is a valid path to the farm and \
process have permissions to access it",
directory.display()
));
}
SingleDiskFarmSummary::Error { directory, error } => {
return Err(anyhow!(
"Failed to open single disk farm info, make sure {} is a valid path to the farm \
and process have permissions to access it: {error}",
directory.display()
));
}
};

let sector_size = sector_size(single_disk_farm_info.pieces_in_sector());
let kzg = Kzg::new(embedded_kzg_settings());
Expand Down Expand Up @@ -272,6 +293,42 @@ fn audit(audit_options: AuditOptions) -> anyhow::Result<()> {
}

fn prove(prove_options: ProveOptions) -> anyhow::Result<()> {
let single_disk_farm_info =
match SingleDiskFarm::collect_summary(prove_options.disk_farm.clone()) {
SingleDiskFarmSummary::Found { info, directory: _ } => info,
SingleDiskFarmSummary::NotFound { directory } => {
return Err(anyhow!(
"No single disk farm info found, make sure {} is a valid path to the farm and \
process have permissions to access it",
directory.display()
));
}
SingleDiskFarmSummary::Error { directory, error } => {
return Err(anyhow!(
"Failed to open single disk farm info, make sure {} is a valid path to the farm \
and process have permissions to access it: {error}",
directory.display()
));
}
};

match single_disk_farm_info {
SingleDiskFarmInfo::V0 { .. } => {
prove_inner::<PosTableLegacy>(prove_options, single_disk_farm_info)
}
SingleDiskFarmInfo::V1 { .. } => {
prove_inner::<PosTable>(prove_options, single_disk_farm_info)
}
}
}

fn prove_inner<PosTable>(
prove_options: ProveOptions,
single_disk_farm_info: SingleDiskFarmInfo,
) -> anyhow::Result<()>
where
PosTable: Table,
{
let ProveOptions {
sample_size,
with_single,
Expand All @@ -281,24 +338,6 @@ fn prove(prove_options: ProveOptions) -> anyhow::Result<()> {
limit_sector_count,
} = prove_options;

let (single_disk_farm_info, disk_farm) = match SingleDiskFarm::collect_summary(disk_farm) {
SingleDiskFarmSummary::Found { info, directory } => (info, directory),
SingleDiskFarmSummary::NotFound { directory } => {
return Err(anyhow!(
"No single disk farm info found, make sure {} is a valid path to the farm and \
process have permissions to access it",
directory.display()
));
}
SingleDiskFarmSummary::Error { directory, error } => {
return Err(anyhow!(
"Failed to open single disk farm info, make sure {} is a valid path to the farm \
and process have permissions to access it: {error}",
directory.display()
));
}
};

let kzg = Kzg::new(embedded_kzg_settings());
let erasure_coding = ErasureCoding::new(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
Expand Down Expand Up @@ -341,7 +380,7 @@ fn prove(prove_options: ProveOptions) -> anyhow::Result<()> {
erasure_coding: &erasure_coding,
sectors_being_modified: &HashSet::default(),
read_sector_record_chunks_mode: ReadSectorRecordChunksMode::ConcurrentChunks,
table_generator: &Mutex::new(PosTable::generator()),
table_generator: &table_generator,
};

let mut audit_results = plot_audit.audit(options).unwrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ impl ClusterSubcommand {
}
}

pub(crate) async fn cluster<PosTable>(cluster_args: ClusterArgs) -> anyhow::Result<()>
pub(crate) async fn cluster<PosTableLegacy, PosTable>(
cluster_args: ClusterArgs,
) -> anyhow::Result<()>
where
PosTableLegacy: Table,
PosTable: Table,
{
let signal = shutdown_signal();
Expand Down Expand Up @@ -120,10 +123,11 @@ where
controller(nats_client, &mut registry, controller_args).await?
}
ClusterSubcommand::Farmer(farmer_args) => {
farmer::<PosTable>(nats_client, &mut registry, farmer_args).await?
farmer::<PosTableLegacy, PosTable>(nats_client, &mut registry, farmer_args).await?
}
ClusterSubcommand::Plotter(plotter_args) => {
plotter::<PosTable>(nats_client, &mut registry, plotter_args).await?
plotter::<PosTableLegacy, PosTable>(nats_client, &mut registry, plotter_args)
.await?
}
ClusterSubcommand::Cache(cache_args) => {
cache(nats_client, &mut registry, cache_args).await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,13 @@ pub(super) struct FarmerArgs {
pub(super) additional_components: Vec<String>,
}

pub(super) async fn farmer<PosTable>(
pub(super) async fn farmer<PosTableLegacy, PosTable>(
nats_client: NatsClient,
registry: &mut Registry,
farmer_args: FarmerArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>>
where
PosTableLegacy: Table,
PosTable: Table,
{
let FarmerArgs {
Expand Down Expand Up @@ -217,13 +218,23 @@ where
.unwrap_or_else(recommended_number_of_farming_threads);

let global_mutex = Arc::default();
let plotter_legacy = Arc::new(ClusterPlotter::new(
nats_client.clone(),
sector_encoding_concurrency,
ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
},
false,
));
let plotter = Arc::new(ClusterPlotter::new(
nats_client.clone(),
sector_encoding_concurrency,
ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
},
true,
));

let farms = {
Expand All @@ -242,6 +253,7 @@ where
let node_client = node_client.clone();
let kzg = kzg.clone();
let erasure_coding = erasure_coding.clone();
let plotter_legacy = Arc::clone(&plotter_legacy);
let plotter = Arc::clone(&plotter);
let global_mutex = Arc::clone(&global_mutex);
let faster_read_sector_record_chunks_mode_barrier =
Expand All @@ -250,14 +262,16 @@ where
Arc::clone(&faster_read_sector_record_chunks_mode_concurrency);

async move {
let farm_fut = SingleDiskFarm::new::<_, _, PosTable>(
let farm_fut = SingleDiskFarm::new::<_, PosTableLegacy, PosTable>(
SingleDiskFarmOptions {
directory: disk_farm.directory.clone(),
farmer_app_info,
allocated_space: disk_farm.allocated_space,
max_pieces_in_sector,
node_client,
reward_address,
plotter_legacy,
plotter,
kzg,
erasure_coding,
// Cache is provided by dedicated caches in farming cluster
Expand All @@ -270,7 +284,6 @@ where
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
plotter,
create,
},
farm_index,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::commands::shared::PlottingThreadPriority;
use anyhow::anyhow;
use clap::Parser;
use futures::{select, FutureExt};
use prometheus_client::registry::Registry;
use std::future::Future;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -80,12 +81,13 @@ pub(super) struct PlotterArgs {
pub(super) additional_components: Vec<String>,
}

pub(super) async fn plotter<PosTable>(
pub(super) async fn plotter<PosTableLegacy, PosTable>(
nats_client: NatsClient,
_registry: &mut Registry,
plotter_args: PlotterArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>>
where
PosTableLegacy: Table,
PosTable: Table,
{
let PlotterArgs {
Expand Down Expand Up @@ -159,8 +161,17 @@ where
)
.map_err(|error| anyhow!("Failed to create thread pool manager: {error}"))?;
let global_mutex = Arc::default();
let cpu_plotter = Arc::new(CpuPlotter::<_, PosTable>::new(
piece_getter,
let legacy_cpu_plotter = Arc::new(CpuPlotter::<_, PosTableLegacy>::new(
piece_getter.clone(),
Arc::clone(&downloading_semaphore),
plotting_thread_pool_manager.clone(),
record_encoding_concurrency,
Arc::clone(&global_mutex),
kzg.clone(),
erasure_coding.clone(),
));
let modern_cpu_plotter = Arc::new(CpuPlotter::<_, PosTable>::new(
piece_getter.clone(),
downloading_semaphore,
plotting_thread_pool_manager,
record_encoding_concurrency,
Expand All @@ -172,8 +183,13 @@ where
// TODO: Metrics

Ok(Box::pin(async move {
plotter_service(&nats_client, &cpu_plotter)
.await
.map_err(|error| anyhow!("Plotter service failed: {error}"))
select! {
result = plotter_service(&nats_client, &legacy_cpu_plotter, false).fuse() => {
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
}
result = plotter_service(&nats_client, &modern_cpu_plotter, true).fuse() => {
result.map_err(|error| anyhow!("Plotter service failed: {error}"))
}
}
}))
}
24 changes: 18 additions & 6 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ fn cache_percentage_parser(s: &str) -> anyhow::Result<NonZeroU8> {

/// Start farming by using multiple replica plot in specified path and connecting to WebSocket
/// server at specified address.
pub(crate) async fn farm<PosTable>(farming_args: FarmingArgs) -> anyhow::Result<()>
pub(crate) async fn farm<PosTableLegacy, PosTable>(farming_args: FarmingArgs) -> anyhow::Result<()>
where
PosTableLegacy: Table,
PosTable: Table,
{
let signal = shutdown_signal();
Expand Down Expand Up @@ -515,8 +516,17 @@ where
.map(|farming_thread_pool_size| farming_thread_pool_size.get())
.unwrap_or_else(recommended_number_of_farming_threads);
let global_mutex = Arc::default();
let plotter = Arc::new(CpuPlotter::<_, PosTable>::new(
piece_getter,
let legacy_cpu_plotter = Arc::new(CpuPlotter::<_, PosTableLegacy>::new(
piece_getter.clone(),
Arc::clone(&downloading_semaphore),
plotting_thread_pool_manager.clone(),
record_encoding_concurrency,
Arc::clone(&global_mutex),
kzg.clone(),
erasure_coding.clone(),
));
let modern_cpu_plotter = Arc::new(CpuPlotter::<_, PosTable>::new(
piece_getter.clone(),
downloading_semaphore,
plotting_thread_pool_manager,
record_encoding_concurrency,
Expand Down Expand Up @@ -544,22 +554,25 @@ where
let farmer_app_info = farmer_app_info.clone();
let kzg = kzg.clone();
let erasure_coding = erasure_coding.clone();
let plotter = Arc::clone(&plotter);
let plotter_legacy = Arc::clone(&legacy_cpu_plotter);
let plotter = Arc::clone(&modern_cpu_plotter);
let global_mutex = Arc::clone(&global_mutex);
let faster_read_sector_record_chunks_mode_barrier =
Arc::clone(&faster_read_sector_record_chunks_mode_barrier);
let faster_read_sector_record_chunks_mode_concurrency =
Arc::clone(&faster_read_sector_record_chunks_mode_concurrency);

async move {
let farm_fut = SingleDiskFarm::new::<_, _, PosTable>(
let farm_fut = SingleDiskFarm::new::<_, PosTableLegacy, PosTable>(
SingleDiskFarmOptions {
directory: disk_farm.directory.clone(),
farmer_app_info,
allocated_space: disk_farm.allocated_space,
max_pieces_in_sector,
node_client,
reward_address,
plotter_legacy,
plotter,
kzg,
erasure_coding,
cache_percentage: cache_percentage.get(),
Expand All @@ -571,7 +584,6 @@ where
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
plotter,
create,
},
farm_index,
Expand Down
Loading

0 comments on commit 231c2b5

Please sign in to comment.