Skip to content

Commit

Permalink
Refactor thread pool usage for farming
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Mar 5, 2024
1 parent 0d9b795 commit 841abf5
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 80 deletions.
135 changes: 67 additions & 68 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1074,83 +1074,82 @@ impl SingleDiskFarm {
}
};

let handle = Handle::current();
let span = span.clone();
thread_pool.install(move || {
let _span_guard = span.enter();
let farming_fut = async move {
if start_receiver.recv().await.is_err() {
// Dropped before starting
return Ok(());
}

let farming_fut = async move {
if start_receiver.recv().await.is_err() {
// Dropped before starting
if let Some(farming_delay) = delay_farmer_receiver {
if farming_delay.await.is_err() {
// Dropped before resolving
return Ok(());
}
}

if let Some(farming_delay) = delay_farmer_receiver {
if farming_delay.await.is_err() {
// Dropped before resolving
return Ok(());
}
}

if cfg!(windows) {
let plot = RayonFiles::open_with(
if cfg!(windows) {
let plot = thread_pool.install(|| {
RayonFiles::open_with(
&directory.join(Self::PLOT_FILE),
UnbufferedIoFileWindows::open,
)?;
let plot_audit = PlotAudit::new(&plot);

let farming_options = FarmingOptions {
public_key,
reward_address,
node_client,
plot_audit,
sectors_metadata,
kzg,
erasure_coding,
handlers,
modifying_sector_index,
slot_info_notifications: slot_info_forwarder_receiver,
};
farming::<PosTable, _, _>(farming_options).await
} else {
let plot = RayonFiles::open(&directory.join(Self::PLOT_FILE))?;
let plot_audit = PlotAudit::new(&plot);

let farming_options = FarmingOptions {
public_key,
reward_address,
node_client,
plot_audit,
sectors_metadata,
kzg,
erasure_coding,
handlers,
modifying_sector_index,
slot_info_notifications: slot_info_forwarder_receiver,
};
farming::<PosTable, _, _>(farming_options).await
}
};
)
})?;
let plot_audit = PlotAudit::new(&plot);

let farming_options = FarmingOptions {
public_key,
reward_address,
node_client,
plot_audit,
sectors_metadata,
kzg,
erasure_coding,
handlers,
modifying_sector_index,
slot_info_notifications: slot_info_forwarder_receiver,
thread_pool,
};
farming::<PosTable, _, _>(farming_options).await
} else {
let plot = thread_pool
.install(move || RayonFiles::open(&directory.join(Self::PLOT_FILE)))?;
let plot_audit = PlotAudit::new(&plot);

let farming_options = FarmingOptions {
public_key,
reward_address,
node_client,
plot_audit,
sectors_metadata,
kzg,
erasure_coding,
handlers,
modifying_sector_index,
slot_info_notifications: slot_info_forwarder_receiver,
thread_pool,
};
farming::<PosTable, _, _>(farming_options).await
}
};

handle.block_on(async {
select! {
farming_result = farming_fut.fuse() => {
if let Err(error) = farming_result
&& let Some(error_sender) = error_sender.lock().take()
&& let Err(error) = error_sender.send(error.into())
{
error!(
%error,
"Farming failed to send error to background task",
);
}
}
_ = stop_receiver.recv().fuse() => {
// Nothing, just exit
Handle::current().block_on(async {
select! {
farming_result = farming_fut.fuse() => {
if let Err(error) = farming_result
&& let Some(error_sender) = error_sender.lock().take()
&& let Err(error) = error_sender.send(error.into())
{
error!(
%error,
"Farming failed to send error to background task",
);
}
}
});
})
_ = stop_receiver.recv().fuse() => {
// Nothing, just exit
}
}
});
}
});
let farming_join_handle = AsyncJoinOnDrop::new(farming_join_handle, false);
Expand Down
39 changes: 27 additions & 12 deletions crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::channel::mpsc;
use futures::StreamExt;
use parity_scale_codec::{Decode, Encode, Error, Input, Output};
use parking_lot::Mutex;
use rayon::ThreadPoolBuildError;
use rayon::{ThreadPool, ThreadPoolBuildError};
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fmt, io};
Expand All @@ -23,7 +23,7 @@ use subspace_farmer_components::ReadAtSync;
use subspace_proof_of_space::{Table, TableGenerator};
use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, trace, warn, Span};

/// Auditing details
#[derive(Debug, Copy, Clone, Encode, Decode)]
Expand Down Expand Up @@ -331,6 +331,7 @@ pub(super) struct FarmingOptions<NC, PlotAudit> {
pub(super) handlers: Arc<Handlers>,
pub(super) modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
pub(super) thread_pool: ThreadPool,
}

/// Starts farming process.
Expand All @@ -356,6 +357,7 @@ where
handlers,
modifying_sector_index,
mut slot_info_notifications,
thread_pool,
} = farming_options;

let farmer_app_info = node_client
Expand All @@ -367,6 +369,7 @@ where
let farming_timeout = farmer_app_info.farming_timeout;

let table_generator = Arc::new(Mutex::new(PosTable::generator()));
let span = Span::current();

while let Some(slot_info) = slot_info_notifications.next().await {
let result: Result<(), FarmingError> = try {
Expand All @@ -380,15 +383,19 @@ where
let modifying_sector_guard = modifying_sector_index.read().await;
let maybe_sector_being_modified = modifying_sector_guard.as_ref().copied();

plot_audit.audit(PlotAuditOptions::<PosTable> {
public_key: &public_key,
reward_address: &reward_address,
slot_info,
sectors_metadata: &sectors_metadata,
kzg: &kzg,
erasure_coding: &erasure_coding,
maybe_sector_being_modified,
table_generator: &table_generator,
thread_pool.install(|| {
let _span_guard = span.enter();

plot_audit.audit(PlotAuditOptions::<PosTable> {
public_key: &public_key,
reward_address: &reward_address,
slot_info,
sectors_metadata: &sectors_metadata,
kzg: &kzg,
erasure_coding: &erasure_coding,
maybe_sector_being_modified,
table_generator: &table_generator,
})
})?
};

Expand All @@ -401,14 +408,22 @@ where
a_solution_distance.cmp(&b_solution_distance)
});

let mut sectors_solutions = sectors_solutions.into_iter();

handlers
.farming_notification
.call_simple(&FarmingNotification::Auditing(AuditingDetails {
sectors_count: sectors_metadata.len() as SectorIndex,
time: start.elapsed(),
}));

'solutions_processing: for (sector_index, sector_solutions) in sectors_solutions {
'solutions_processing: while let Some((sector_index, sector_solutions)) = thread_pool
.install(|| {
let _span_guard = span.enter();

sectors_solutions.next()
})
{
if sector_solutions.is_empty() {
continue;
}
Expand Down

0 comments on commit 841abf5

Please sign in to comment.