Skip to content

Commit

Permalink
Merge pull request #2728 from subspace/farmer-tweaks-and-cleanups
Browse files Browse the repository at this point in the history
Farmer tweaks and cleanups
  • Loading branch information
nazar-pc authored Apr 30, 2024
2 parents b70f89d + 77fb5d6 commit eb09630
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 88 deletions.
106 changes: 43 additions & 63 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
use futures::channel::oneshot;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{FutureExt, StreamExt, TryStreamExt};
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::fs;
Expand Down Expand Up @@ -197,7 +197,7 @@ pub(crate) struct FarmingArgs {
disable_farm_locking: bool,
/// Exit on farm error.
///
/// By default, farmer will continue running if the are still other working farms.
/// By default, farmer will continue running if there are still other working farms.
#[arg(long)]
exit_on_farm_error: bool,
}
Expand All @@ -206,7 +206,7 @@ fn cache_percentage_parser(s: &str) -> anyhow::Result<NonZeroU8> {
let cache_percentage = NonZeroU8::from_str(s)?;

if cache_percentage.get() > 99 {
return Err(anyhow::anyhow!("Cache percentage can't exceed 99"));
return Err(anyhow!("Cache percentage can't exceed 99"));
}

Ok(cache_percentage)
Expand Down Expand Up @@ -295,7 +295,7 @@ where
let farmer_app_info = node_client
.farmer_app_info()
.await
.map_err(|error| anyhow::anyhow!(error))?;
.map_err(|error| anyhow!(error))?;

let first_farm_directory = &disk_farms
.first()
Expand Down Expand Up @@ -350,7 +350,7 @@ where
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow::anyhow!(error))?;
.map_err(|error| anyhow!(error))?;
let validator = Some(SegmentCommitmentPieceValidator::new(
node.clone(),
node_client.clone(),
Expand Down Expand Up @@ -407,17 +407,14 @@ where
let replotting_thread_pool_core_indices;
if let Some(plotting_cpu_cores) = plotting_cpu_cores {
plotting_thread_pool_core_indices = parse_cpu_cores_sets(&plotting_cpu_cores)
.map_err(|error| anyhow::anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?;
.map_err(|error| anyhow!("Failed to parse `--plotting-cpu-cores`: {error}"))?;
replotting_thread_pool_core_indices = match replotting_cpu_cores {
Some(replotting_cpu_cores) => {
parse_cpu_cores_sets(&replotting_cpu_cores).map_err(|error| {
anyhow::anyhow!("Failed to parse `--replotting-cpu-cores`: {error}")
})?
}
Some(replotting_cpu_cores) => parse_cpu_cores_sets(&replotting_cpu_cores)
.map_err(|error| anyhow!("Failed to parse `--replotting-cpu-cores`: {error}"))?,
None => plotting_thread_pool_core_indices.clone(),
};
if plotting_thread_pool_core_indices.len() != replotting_thread_pool_core_indices.len() {
return Err(anyhow::anyhow!(
return Err(anyhow!(
"Number of plotting thread pools ({}) is not the same as for replotting ({})",
plotting_thread_pool_core_indices.len(),
replotting_thread_pool_core_indices.len()
Expand Down Expand Up @@ -551,7 +548,7 @@ where
}) => {
return (
farm_index,
Err(anyhow::anyhow!(
Err(anyhow!(
"Allocated space {} ({}) is not enough, minimum is ~{} (~{}, \
{} bytes to be exact)",
bytesize::to_string(allocated_space, true),
Expand Down Expand Up @@ -646,60 +643,43 @@ where
info!("Collecting already plotted pieces (this will take some time)...");

// Collect already plotted pieces
{
let mut plotted_pieces = plotted_pieces.write().await;
let mut total_and_plotted_sectors = Vec::with_capacity(farms.len());

for (farm_index, farm) in farms.iter().enumerate() {
let farm_index = farm_index.try_into().map_err(|_error| {
anyhow!(
"More than 256 plots are not supported, consider running multiple farmer \
for (farm_index, farm) in farms.iter().enumerate() {
let mut plotted_pieces = plotted_pieces.write().await;
let farm_index = farm_index.try_into().map_err(|_error| {
anyhow!(
"More than 256 plots are not supported, consider running multiple farmer \
instances"
)
})?;

plotted_pieces.add_farm(farm_index, farm.piece_reader());

let plotted_sectors = farm.plotted_sectors();
let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| {
anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}")
})?;
while let Some(plotted_sector_result) = plotted_sectors.next().await {
match plotted_sector_result {
Ok(plotted_sector) => {
plotted_pieces.add_sector(farm_index, &plotted_sector);
}
Err(error) => {
error!(
%error,
%farm_index,
"Failed reading plotted sector on startup, skipping"
);
}
}
}
)
})?;

plotted_pieces.add_farm(farm_index, farm.piece_reader());

let total_sector_count = farm.total_sectors_count();
let mut plotted_sectors_count = 0;
let plotted_sectors = farm.plotted_sectors();
let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| {
anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}")
})?;

while let Some(plotted_sector_result) = plotted_sectors.next().await {
plotted_sectors_count += 1;
plotted_pieces.add_sector(
farm_index,
&plotted_sector_result.map_err(|error| {
anyhow!(
"Failed reading plotted sector on startup for farm {farm_index}: {error}"
)
})?,
)
}

total_and_plotted_sectors.push((total_sector_count, plotted_sectors_count));
}

info!("Finished collecting already plotted pieces successfully");

let total_and_plotted_sectors = farms
.iter()
.enumerate()
.map(|(farm_index, farm)| async move {
let total_sector_count = farm.total_sectors_count();
let plotted_sectors_count = farm.plotted_sectors_count().await.map_err(|error| {
anyhow!(
"Failed to get plotted sectors count from from index {farm_index}: \
{error}"
)
})?;

anyhow::Ok((total_sector_count, plotted_sectors_count))
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;

let mut farms_stream = (0u8..)
.zip(farms)
.zip(total_and_plotted_sectors)
Expand Down Expand Up @@ -871,7 +851,7 @@ where
let farm_fut = pin!(farm_fut);
let farmer_cache_worker_fut = pin!(farmer_cache_worker_fut);

futures::select!(
select! {
// Signal future
_ = signal.fuse() => {},

Expand All @@ -889,7 +869,7 @@ where
_ = farmer_cache_worker_fut.fuse() => {
info!("Farmer cache worker exited.")
},
);
}

anyhow::Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,15 @@ where
async move {
let internal_result = match req {
SegmentHeaderRequest::SegmentIndexes { segment_indexes } => {
if segment_indexes.len() as u64 > SEGMENT_HEADER_NUMBER_LIMIT {
debug!(
"segment_indexes length exceed the limit: {} ",
segment_indexes.len()
);

return None;
}

debug!(
segment_indexes_count = ?segment_indexes.len(),
"Segment headers request received."
Expand Down
42 changes: 36 additions & 6 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::node_client;
use async_trait::async_trait;
use derive_more::{Display, From};
use futures::Stream;
use parity_scale_codec::{Decode, Encode, Input, Output};
use parity_scale_codec::{Decode, Encode, EncodeLike, Input, Output};
use serde::{Deserialize, Serialize};
use std::future::Future;
use std::pin::Pin;
Expand Down Expand Up @@ -50,7 +50,7 @@ pub trait PieceCache: Send + Sync + fmt::Debug {
/// doesn't happen for the same piece being accessed!
async fn contents(
&self,
) -> Box<dyn Stream<Item = (PieceCacheOffset, Option<PieceIndex>)> + Unpin + '_>;
) -> Box<dyn Stream<Item = (PieceCacheOffset, Option<PieceIndex>)> + Unpin + Send + '_>;

/// Store piece in cache at specified offset, replacing existing piece if there is any.
///
Expand Down Expand Up @@ -332,7 +332,7 @@ pub trait PieceReader: Send + Sync + fmt::Debug {
}

/// Opaque handler ID for event handlers, once dropped handler will be removed automatically
pub trait HandlerId: Send + fmt::Debug {
pub trait HandlerId: Send + Sync + fmt::Debug {
/// Consumes [`HandlerId`] and prevents handler from being removed automatically.
fn detach(&self);
}
Expand All @@ -353,6 +353,39 @@ pub enum FarmId {
Ulid(Ulid),
}

impl Encode for FarmId {
fn size_hint(&self) -> usize {
1_usize
+ match self {
FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)),
}
}
fn encode_to<O: Output + ?Sized>(&self, output: &mut O) {
match self {
FarmId::Ulid(ulid) => {
output.push_byte(0);
Encode::encode_to(&ulid.0, output);
}
}
}
}

impl EncodeLike for FarmId {}

impl Decode for FarmId {
fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
match input
.read_byte()
.map_err(|e| e.chain("Could not decode `FarmId`, failed to read variant byte"))?
{
0 => u128::decode(input)
.map(|ulid| FarmId::Ulid(Ulid(ulid)))
.map_err(|e| e.chain("Could not decode `FarmId::Ulid.0`")),
_ => Err("Could not decode `FarmId`, variant doesn't exist".into()),
}
}
}

#[allow(clippy::new_without_default)]
impl FarmId {
/// Creates new ID
Expand All @@ -370,9 +403,6 @@ pub trait Farm {
/// Number of sectors in this farm
fn total_sectors_count(&self) -> SectorIndex;

/// Number of sectors successfully plotted so far
async fn plotted_sectors_count(&self) -> Result<SectorIndex, FarmError>;

/// Get plotted sectors instance
fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;

Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl farm::PieceCache for PieceCache {

async fn contents(
&self,
) -> Box<dyn Stream<Item = (PieceCacheOffset, Option<PieceIndex>)> + Unpin + '_> {
) -> Box<dyn Stream<Item = (PieceCacheOffset, Option<PieceIndex>)> + Unpin + Send + '_> {
let this = self.clone();
let (mut sender, receiver) = mpsc::channel(1);
let read_contents = task::spawn_blocking(move || {
Expand Down
18 changes: 1 addition & 17 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ mod plotted_sectors;
mod plotting;
pub mod unbuffered_io_file_windows;

use crate::farm::{
Farm, FarmError, FarmId, HandlerFn, PieceReader, PlotCache, PlottedSectors, SectorUpdate,
};
use crate::farm::{Farm, FarmId, HandlerFn, PieceReader, PlotCache, PlottedSectors, SectorUpdate};
pub use crate::farm::{FarmingError, FarmingNotification};
use crate::identity::{Identity, IdentityError};
use crate::node_client::NodeClient;
Expand Down Expand Up @@ -615,10 +613,6 @@ impl Farm for SingleDiskFarm {
self.total_sectors_count
}

async fn plotted_sectors_count(&self) -> Result<SectorIndex, FarmError> {
Ok(self.plotted_sectors_count().await)
}

fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
Arc::new(self.plotted_sectors())
}
Expand Down Expand Up @@ -1469,16 +1463,6 @@ impl SingleDiskFarm {
self.total_sectors_count
}

/// Number of sectors successfully plotted so far
pub async fn plotted_sectors_count(&self) -> SectorIndex {
self.sectors_metadata
.read()
.await
.len()
.try_into()
.expect("Number of sectors never exceeds `SectorIndex` type; qed")
}

/// Read information about sectors plotted so far
pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
SingleDiskPlottedSectors {
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl farm::PieceCache for DiskPieceCache {

async fn contents(
&self,
) -> Box<dyn Stream<Item = (PieceCacheOffset, Option<PieceIndex>)> + Unpin + '_> {
) -> Box<dyn Stream<Item = (PieceCacheOffset, Option<PieceIndex>)> + Unpin + Send + '_> {
if let Some(piece_cache) = &self.maybe_piece_cache {
farm::PieceCache::contents(piece_cache).await
} else {
Expand Down

0 comments on commit eb09630

Please sign in to comment.