Skip to content

Commit

Permalink
Merge pull request #2724 from subspace/plotted-sectors-abstraction
Browse files Browse the repository at this point in the history
Add `PlottedSectors` abstraction in `Farm`
  • Loading branch information
nazar-pc authored Apr 29, 2024
2 parents 4821618 + 8db3cf2 commit 6ad33f1
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ where
plotted_pieces.add_farm(farm_index, farm.piece_reader());

for (sector_index, mut plotted_sectors) in
(0 as SectorIndex..).zip(farm.plotted_sectors().await)
(0 as SectorIndex..).zip(farm.plotted_sectors().get().await)
{
while let Some(plotted_sector_result) = plotted_sectors.next().await {
match plotted_sector_result {
Expand Down
17 changes: 13 additions & 4 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ use ulid::Ulid;
pub type FarmError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub type HandlerFn<A> = Arc<dyn Fn(&A) + Send + Sync + 'static>;

/// Getter for plotted sectors
#[async_trait]
pub trait PlottedSectors: Send + Sync + fmt::Debug {
async fn get(
&self,
) -> Result<
Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
FarmError,
>;
}

/// Offset wrapper for pieces in [`PieceCache`]
#[derive(Debug, Display, Copy, Clone, Encode, Decode)]
#[repr(transparent)]
Expand Down Expand Up @@ -362,10 +373,8 @@ pub trait Farm {
/// Number of sectors successfully plotted so far
async fn plotted_sectors_count(&self) -> Result<SectorIndex, FarmError>;

/// Read information about sectors plotted so far
async fn plotted_sectors(
&self,
) -> Result<Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + '_>, FarmError>;
/// Get plotted sectors instance
fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;

/// Get piece cache instance
fn piece_cache(&self) -> Arc<dyn PieceCache + 'static>;
Expand Down
70 changes: 19 additions & 51 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ pub mod farming;
pub mod piece_cache;
pub mod piece_reader;
pub mod plot_cache;
mod plotted_sectors;
mod plotting;
pub mod unbuffered_io_file_windows;

use crate::farm::{Farm, FarmError, FarmId, HandlerFn, PieceReader, PlotCache, SectorUpdate};
use crate::farm::{
Farm, FarmError, FarmId, HandlerFn, PieceReader, PlotCache, PlottedSectors, SectorUpdate,
};
pub use crate::farm::{FarmingError, FarmingNotification};
use crate::identity::{Identity, IdentityError};
use crate::node_client::NodeClient;
Expand All @@ -19,6 +22,7 @@ use crate::single_disk_farm::farming::{
use crate::single_disk_farm::piece_cache::DiskPieceCache;
use crate::single_disk_farm::piece_reader::DiskPieceReader;
use crate::single_disk_farm::plot_cache::DiskPlotCache;
use crate::single_disk_farm::plotted_sectors::SingleDiskPlottedSectors;
pub use crate::single_disk_farm::plotting::PlottingError;
use crate::single_disk_farm::plotting::{
plotting, plotting_scheduler, PlottingOptions, PlottingSchedulerOptions, SectorPlottingOptions,
Expand All @@ -33,7 +37,7 @@ use async_trait::async_trait;
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::{mpsc, oneshot};
use futures::stream::FuturesUnordered;
use futures::{select, stream, FutureExt, Stream, StreamExt};
use futures::{select, FutureExt, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use rand::prelude::*;
Expand All @@ -55,13 +59,12 @@ use std::{fs, io, mem};
use subspace_core_primitives::crypto::kzg::Kzg;
use subspace_core_primitives::crypto::{blake3_hash, Scalar};
use subspace_core_primitives::{
Blake3Hash, HistorySize, PieceOffset, PublicKey, Record, SectorId, SectorIndex, SegmentIndex,
Blake3Hash, HistorySize, PublicKey, Record, SectorIndex, SegmentIndex,
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::FileExt;
#[cfg(not(windows))]
use subspace_farmer_components::file_ext::OpenOptionsExt;
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::reading::ReadSectorRecordChunksMode;
use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
use subspace_farmer_components::{FarmerProtocolInfo, ReadAtSync};
Expand Down Expand Up @@ -616,15 +619,8 @@ impl Farm for SingleDiskFarm {
Ok(self.plotted_sectors_count().await)
}

async fn plotted_sectors(
&self,
) -> Result<Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + '_>, FarmError>
{
Ok(Box::new(stream::iter(
self.plotted_sectors()
.await
.map(|result| result.map_err(Into::into)),
)))
fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
Arc::new(self.plotted_sectors())
}

fn piece_cache(&self) -> Arc<dyn farm::PieceCache + 'static> {
Expand All @@ -642,21 +638,18 @@ impl Farm for SingleDiskFarm {
fn on_sector_update(
&self,
callback: HandlerFn<(SectorIndex, SectorUpdate)>,
) -> Box<dyn crate::farm::HandlerId> {
) -> Box<dyn farm::HandlerId> {
Box::new(self.on_sector_update(callback))
}

fn on_farming_notification(
&self,
callback: HandlerFn<FarmingNotification>,
) -> Box<dyn crate::farm::HandlerId> {
) -> Box<dyn farm::HandlerId> {
Box::new(self.on_farming_notification(callback))
}

fn on_solution(
&self,
callback: HandlerFn<SolutionResponse>,
) -> Box<dyn crate::farm::HandlerId> {
fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn farm::HandlerId> {
Box::new(self.on_solution(callback))
}

Expand Down Expand Up @@ -1487,38 +1480,13 @@ impl SingleDiskFarm {
}

/// Read information about sectors plotted so far
pub async fn plotted_sectors(
&self,
) -> impl Iterator<Item = Result<PlottedSector, parity_scale_codec::Error>> + '_ {
let public_key = self.single_disk_farm_info.public_key();
let sectors_metadata = self.sectors_metadata.read().await.clone();

(0..)
.zip(sectors_metadata)
.map(move |(sector_index, sector_metadata)| {
let sector_id = SectorId::new(public_key.hash(), sector_index);

let mut piece_indexes = Vec::with_capacity(usize::from(self.pieces_in_sector));
(PieceOffset::ZERO..)
.take(usize::from(self.pieces_in_sector))
.map(|piece_offset| {
sector_id.derive_piece_index(
piece_offset,
sector_metadata.history_size,
self.farmer_protocol_info.max_pieces_in_sector,
self.farmer_protocol_info.recent_segments,
self.farmer_protocol_info.recent_history_fraction,
)
})
.collect_into(&mut piece_indexes);

Ok(PlottedSector {
sector_id,
sector_index,
sector_metadata,
piece_indexes,
})
})
pub fn plotted_sectors(&self) -> SingleDiskPlottedSectors {
SingleDiskPlottedSectors {
public_key: *self.single_disk_farm_info.public_key(),
pieces_in_sector: self.pieces_in_sector,
farmer_protocol_info: self.farmer_protocol_info,
sectors_metadata: Arc::clone(&self.sectors_metadata),
}
}

/// Get piece cache instance
Expand Down
57 changes: 57 additions & 0 deletions crates/subspace-farmer/src/single_disk_farm/plotted_sectors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
use crate::farm::{FarmError, PlottedSectors};
use async_lock::RwLock as AsyncRwLock;
use async_trait::async_trait;
use futures::{stream, Stream};
use std::sync::Arc;
use subspace_core_primitives::{PieceOffset, PublicKey, SectorId};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::SectorMetadataChecksummed;
use subspace_farmer_components::FarmerProtocolInfo;

/// Getter for single disk plotted sectors
#[derive(Debug)]
pub struct SingleDiskPlottedSectors {
pub(super) public_key: PublicKey,
pub(super) pieces_in_sector: u16,
pub(super) farmer_protocol_info: FarmerProtocolInfo,
pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
}

#[async_trait]
impl PlottedSectors for SingleDiskPlottedSectors {
async fn get(
&self,
) -> Result<
Box<dyn Stream<Item = Result<PlottedSector, FarmError>> + Unpin + Send + '_>,
FarmError,
> {
let public_key_hash = self.public_key.hash();
let sectors_metadata = self.sectors_metadata.read().await.clone();
Ok(Box::new(stream::iter((0..).zip(sectors_metadata).map(
move |(sector_index, sector_metadata)| {
let sector_id = SectorId::new(public_key_hash, sector_index);

let mut piece_indexes = Vec::with_capacity(usize::from(self.pieces_in_sector));
(PieceOffset::ZERO..)
.take(usize::from(self.pieces_in_sector))
.map(|piece_offset| {
sector_id.derive_piece_index(
piece_offset,
sector_metadata.history_size,
self.farmer_protocol_info.max_pieces_in_sector,
self.farmer_protocol_info.recent_segments,
self.farmer_protocol_info.recent_history_fraction,
)
})
.collect_into(&mut piece_indexes);

Ok(PlottedSector {
sector_id,
sector_index,
sector_metadata,
piece_indexes,
})
},
))))
}
}

0 comments on commit 6ad33f1

Please sign in to comment.