Skip to content

Commit

Permalink
Move some generic data structures from under single_disk_farm into …
Browse files Browse the repository at this point in the history
…`farm` module, no other code changes
  • Loading branch information
nazar-pc committed Mar 18, 2024
1 parent f6cfb28 commit 4eca1d8
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 243 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ use std::{fmt, fs};
use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{PublicKey, Record, SectorIndex};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::farm::Farm;
use subspace_farmer::farm::{
Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate,
};
use subspace_farmer::farmer_cache::FarmerCache;
use subspace_farmer::single_disk_farm::farming::FarmingNotification;
use subspace_farmer::single_disk_farm::{
SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, SingleDiskFarm,
SingleDiskFarmError, SingleDiskFarmOptions,
SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions,
};
use subspace_farmer::utils::farmer_piece_getter::{DsnCacheRetryPolicy, FarmerPieceGetter};
use subspace_farmer::utils::piece_validator::SegmentCommitmentPieceValidator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ use std::fmt;
use std::sync::atomic::{AtomicI64, AtomicU64};
use std::time::Duration;
use subspace_core_primitives::SectorIndex;
use subspace_farmer::farm::FarmId;
use subspace_farmer::single_disk_farm::farming::ProvingResult;
use subspace_farmer::single_disk_farm::FarmingError;
use subspace_farmer::farm::{FarmId, FarmingError, ProvingResult};

#[derive(Debug, Copy, Clone)]
pub(super) enum SectorState {
Expand Down
225 changes: 219 additions & 6 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
use crate::single_disk_farm::farming::FarmingNotification;
use crate::single_disk_farm::plot_cache::MaybePieceStoredResult;
use crate::single_disk_farm::SectorUpdate;
use crate::node_client;
use async_trait::async_trait;
use derive_more::{Display, From};
use futures::Stream;
use parity_scale_codec::{Decode, Encode};
use parity_scale_codec::{Decode, Encode, Input, Output};
use serde::{Deserialize, Serialize};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex};
use std::time::Duration;
use std::{fmt, io};
use subspace_core_primitives::{Piece, PieceIndex, PieceOffset, SectorIndex, SegmentIndex};
use subspace_farmer_components::auditing::AuditingError;
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::proving::ProvingError;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_rpc_primitives::SolutionResponse;
use thiserror::Error;
use ulid::Ulid;

/// Erased error type
Expand Down Expand Up @@ -70,6 +72,16 @@ pub trait PieceCache: Send + Sync + fmt::Debug {
async fn read_piece(&self, offset: PieceCacheOffset) -> Result<Option<Piece>, FarmError>;
}

#[derive(Debug, Copy, Clone, Encode, Decode)]
pub enum MaybePieceStoredResult {
/// Definitely not stored
No,
/// Maybe has vacant slot to store
Vacant,
/// Maybe still stored
Yes,
}

/// Abstract plot cache implementation
#[async_trait]
pub trait PlotCache: Send + Sync + fmt::Debug {
Expand All @@ -93,6 +105,207 @@ pub trait PlotCache: Send + Sync + fmt::Debug {
async fn read_piece(&self, key: &RecordKey) -> Result<Option<Piece>, FarmError>;
}

/// Auditing details
#[derive(Debug, Copy, Clone, Encode, Decode)]
pub struct AuditingDetails {
/// Number of sectors that were audited
pub sectors_count: SectorIndex,
/// Audit duration
pub time: Duration,
}

/// Result of the proving
#[derive(Debug, Copy, Clone, Encode, Decode)]
pub enum ProvingResult {
/// Proved successfully and accepted by the node
Success,
/// Proving took too long
Timeout,
/// Managed to prove within time limit, but node rejected solution, likely due to timeout on its
/// end
Rejected,
}

impl fmt::Display for ProvingResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
ProvingResult::Success => "Success",
ProvingResult::Timeout => "Timeout",
ProvingResult::Rejected => "Rejected",
})
}
}

/// Proving details
#[derive(Debug, Copy, Clone, Encode, Decode)]
pub struct ProvingDetails {
/// Whether proving ended up being successful
pub result: ProvingResult,
/// Audit duration
pub time: Duration,
}

/// Special decoded farming error
#[derive(Debug, Encode, Decode)]
pub struct DecodedFarmingError {
/// String representation of an error
error: String,
/// Whether error is fatal
is_fatal: bool,
}

impl fmt::Display for DecodedFarmingError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.error.fmt(f)
}
}

/// Errors that happen during farming
#[derive(Debug, Error)]
pub enum FarmingError {
/// Failed to subscribe to slot info notifications
#[error("Failed to subscribe to slot info notifications: {error}")]
FailedToSubscribeSlotInfo {
/// Lower-level error
error: node_client::Error,
},
/// Failed to retrieve farmer info
#[error("Failed to retrieve farmer info: {error}")]
FailedToGetFarmerInfo {
/// Lower-level error
error: node_client::Error,
},
/// Slot info notification stream ended
#[error("Slot info notification stream ended")]
SlotNotificationStreamEnded,
/// Low-level auditing error
#[error("Low-level auditing error: {0}")]
LowLevelAuditing(#[from] AuditingError),
/// Low-level proving error
#[error("Low-level proving error: {0}")]
LowLevelProving(#[from] ProvingError),
/// I/O error occurred
#[error("Farming I/O error: {0}")]
Io(#[from] io::Error),
/// Decoded farming error
#[error("Decoded farming error {0}")]
Decoded(DecodedFarmingError),
}

impl Encode for FarmingError {
fn encode_to<O: Output + ?Sized>(&self, dest: &mut O) {
let error = DecodedFarmingError {
error: self.to_string(),
is_fatal: self.is_fatal(),
};

error.encode_to(dest)
}
}

impl Decode for FarmingError {
fn decode<I: Input>(input: &mut I) -> Result<Self, parity_scale_codec::Error> {
DecodedFarmingError::decode(input).map(FarmingError::Decoded)
}
}

impl FarmingError {
/// String variant of the error, primarily for monitoring purposes
pub fn str_variant(&self) -> &str {
match self {
FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo",
FarmingError::FailedToGetFarmerInfo { .. } => "FailedToGetFarmerInfo",
FarmingError::LowLevelAuditing(_) => "LowLevelAuditing",
FarmingError::LowLevelProving(_) => "LowLevelProving",
FarmingError::Io(_) => "Io",
FarmingError::Decoded(_) => "Decoded",
FarmingError::SlotNotificationStreamEnded => "SlotNotificationStreamEnded",
}
}

/// Whether this error is fatal and makes farm unusable
pub fn is_fatal(&self) -> bool {
match self {
FarmingError::FailedToSubscribeSlotInfo { .. } => true,
FarmingError::FailedToGetFarmerInfo { .. } => true,
FarmingError::LowLevelAuditing(_) => true,
FarmingError::LowLevelProving(error) => error.is_fatal(),
FarmingError::Io(_) => true,
FarmingError::Decoded(error) => error.is_fatal,
FarmingError::SlotNotificationStreamEnded => true,
}
}
}

/// Various farming notifications
#[derive(Debug, Clone, Encode, Decode)]
pub enum FarmingNotification {
/// Auditing
Auditing(AuditingDetails),
/// Proving
Proving(ProvingDetails),
/// Non-fatal farming error
NonFatalError(Arc<FarmingError>),
}

/// Details about sector currently being plotted
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorPlottingDetails {
/// Starting plotting of a sector
Starting {
/// Progress so far in % (not including this sector)
progress: f32,
/// Whether sector is being replotted
replotting: bool,
/// Whether this is the last sector queued so far
last_queued: bool,
},
/// Downloading sector pieces
Downloading,
/// Downloaded sector pieces
Downloaded(Duration),
/// Encoding sector pieces
Encoding,
/// Encoded sector pieces
Encoded(Duration),
/// Writing sector
Writing,
/// Written sector
Written(Duration),
/// Finished plotting
Finished {
/// Information about plotted sector
plotted_sector: PlottedSector,
/// Information about old plotted sector that was replaced
old_plotted_sector: Option<PlottedSector>,
/// How much time it took to plot a sector
time: Duration,
},
}

/// Details about sector expiration
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorExpirationDetails {
/// Sector expiration became known
Determined {
/// Segment index at which sector expires
expires_at: SegmentIndex,
},
/// Sector will expire at the next segment index and should be replotted
AboutToExpire,
/// Sector already expired
Expired,
}

/// Various sector updates
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorUpdate {
/// Sector is being plotted
Plotting(SectorPlottingDetails),
/// Sector expiration information updated
Expiration(SectorExpirationDetails),
}

/// Abstract piece reader implementation
#[async_trait]
pub trait PieceReader: Send + Sync + fmt::Debug {
Expand Down
3 changes: 1 addition & 2 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#[cfg(test)]
mod tests;

use crate::farm::{PieceCache, PieceCacheOffset, PlotCache};
use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheOffset, PlotCache};
use crate::node_client::NodeClient;
use crate::single_disk_farm::plot_cache::MaybePieceStoredResult;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
Expand Down
23 changes: 7 additions & 16 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,24 @@ pub mod plot_cache;
mod plotting;
pub mod unbuffered_io_file_windows;

use crate::farm::{Farm, FarmError, FarmId, HandlerFn, PieceCache, PieceReader, PlotCache};
use crate::farm::{
Farm, FarmError, FarmId, HandlerFn, PieceCache, PieceReader, PlotCache, SectorUpdate,
};
pub use crate::farm::{FarmingError, FarmingNotification};
use crate::identity::{Identity, IdentityError};
use crate::node_client::NodeClient;
use crate::reward_signing::reward_signing;
use crate::single_disk_farm::farming::rayon_files::RayonFiles;
pub use crate::single_disk_farm::farming::FarmingError;
use crate::single_disk_farm::farming::{
farming, slot_notification_forwarder, FarmingNotification, FarmingOptions, PlotAudit,
farming, slot_notification_forwarder, FarmingOptions, PlotAudit,
};
use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError};
use crate::single_disk_farm::piece_reader::DiskPieceReader;
use crate::single_disk_farm::plot_cache::DiskPlotCache;
pub use crate::single_disk_farm::plotting::PlottingError;
use crate::single_disk_farm::plotting::{
plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions,
};
pub use crate::single_disk_farm::plotting::{
PlottingError, SectorExpirationDetails, SectorPlottingDetails,
};
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
Expand Down Expand Up @@ -153,7 +153,7 @@ impl SingleDiskFarmInfo {
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
}

/// Store `SingleDiskFarm` info to path so it can be loaded again upon restart.
/// Store `SingleDiskFarm` info to path, so it can be loaded again upon restart.
pub fn store_to(&self, directory: &Path) -> io::Result<()> {
fs::write(
directory.join(Self::FILE_NAME),
Expand Down Expand Up @@ -542,15 +542,6 @@ type BackgroundTask = Pin<Box<dyn Future<Output = Result<(), BackgroundTaskError

type Handler<A> = Bag<HandlerFn<A>, A>;

/// Various sector updates
#[derive(Debug, Clone, Encode, Decode)]
pub enum SectorUpdate {
/// Sector is being plotted
Plotting(SectorPlottingDetails),
/// Sector expiration information updated
Expiration(SectorExpirationDetails),
}

#[derive(Default, Debug)]
struct Handlers {
sector_update: Handler<(SectorIndex, SectorUpdate)>,
Expand Down
Loading

0 comments on commit 4eca1d8

Please sign in to comment.