Skip to content

Commit

Permalink
Merge pull request #2745 from subspace/farmer-cleanups
Browse files Browse the repository at this point in the history
Farmer cleanups
  • Loading branch information
nazar-pc authored May 8, 2024
2 parents f8bf280 + a9042b1 commit a2206ad
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ fn audit(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow::anyhow!(error))?;
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
let table_generator = Mutex::new(PosTable::generator());

let sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm)
Expand Down Expand Up @@ -276,7 +276,7 @@ fn prove(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow::anyhow!(error))?;
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
let table_generator = Mutex::new(PosTable::generator());

let mut sectors_metadata = SingleDiskFarm::read_all_sectors_metadata(&disk_farm)
Expand Down
18 changes: 9 additions & 9 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ pub(crate) struct FarmingArgs {
#[arg(long)]
dev: bool,
/// Run temporary farmer with specified plot size in human-readable format (e.g. 10GB, 2TiB) or
/// just bytes (e.g. 4096), this will create a temporary directory for storing farmer data that
/// will be deleted at the end of the process.
/// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the
/// end of the process.
#[arg(long, conflicts_with = "disk_farms")]
tmp: Option<ByteSize>,
/// Maximum number of pieces in sector (can override protocol value to something lower).
Expand Down Expand Up @@ -257,7 +257,7 @@ where
!cfg!(windows)
|| disk_farms
.iter()
.map(|farm| farm.allocated_plotting_space)
.map(|farm| farm.allocated_space)
.sum::<u64>()
<= MAX_SPACE_PLEDGED_FOR_PLOT_CACHE_ON_WINDOWS
});
Expand All @@ -272,7 +272,7 @@ where

disk_farms = vec![DiskFarm {
directory: tmp_directory.as_ref().to_path_buf(),
allocated_plotting_space: plot_size.as_u64(),
allocated_space: plot_size.as_u64(),
read_sector_record_chunks_mode: None,
}];

Expand Down Expand Up @@ -304,7 +304,7 @@ where
let farmer_app_info = node_client
.farmer_app_info()
.await
.map_err(|error| anyhow!(error))?;
.map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?;

let first_farm_directory = &disk_farms
.first()
Expand Down Expand Up @@ -370,7 +370,7 @@ where
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
.expect("Not zero; qed"),
)
.map_err(|error| anyhow!(error))?;
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
let validator = Some(SegmentCommitmentPieceValidator::new(
node.clone(),
node_client.clone(),
Expand Down Expand Up @@ -542,7 +542,7 @@ where
SingleDiskFarmOptions {
directory: disk_farm.directory.clone(),
farmer_app_info,
allocated_space: disk_farm.allocated_plotting_space,
allocated_space: disk_farm.allocated_space,
max_pieces_in_sector,
node_client,
reward_address,
Expand Down Expand Up @@ -679,7 +679,7 @@ where

plotted_pieces.add_farm(farm_index, farm.piece_reader());

let total_sector_count = farm.total_sectors_count();
let total_sectors_count = farm.total_sectors_count();
let mut plotted_sectors_count = 0;
let plotted_sectors = farm.plotted_sectors();
let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| {
Expand All @@ -698,7 +698,7 @@ where
)
}

total_and_plotted_sectors.push((total_sector_count, plotted_sectors_count));
total_and_plotted_sectors.push((total_sectors_count, plotted_sectors_count));
}

info!("Finished collecting already plotted pieces successfully");
Expand Down
20 changes: 9 additions & 11 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ impl From<PlottingThreadPriority> for Option<ThreadPriority> {

#[derive(Debug, Clone)]
pub(in super::super) struct DiskFarm {
/// Path to directory where data is stored.
/// Path to directory where farm is stored
pub(in super::super) directory: PathBuf,
/// How much space in bytes can farm use for plots (metadata space is not included)
pub(in super::super) allocated_plotting_space: u64,
/// How much space in bytes can farm use
pub(in super::super) allocated_space: u64,
/// Which mode to use for reading of sector record chunks
pub(in super::super) read_sector_record_chunks_mode: Option<ReadSectorRecordChunksMode>,
}
Expand All @@ -76,7 +76,7 @@ impl FromStr for DiskFarm {
}

let mut plot_directory = None;
let mut allocated_plotting_space = None;
let mut allocated_space = None;
let mut read_sector_record_chunks_mode = None;

for part in parts {
Expand All @@ -93,7 +93,7 @@ impl FromStr for DiskFarm {
plot_directory.replace(PathBuf::from(value));
}
"size" => {
allocated_plotting_space.replace(
allocated_space.replace(
value
.parse::<ByteSize>()
.map_err(|error| {
Expand Down Expand Up @@ -121,12 +121,10 @@ impl FromStr for DiskFarm {
}

Ok(DiskFarm {
directory: plot_directory.ok_or({
"`path` key is required with path to directory where plots will be stored"
})?,
allocated_plotting_space: allocated_plotting_space.ok_or({
"`size` key is required with path to directory where plots will be stored"
})?,
directory: plot_directory
.ok_or("`path` key is required with path to directory where farm will be stored")?,
allocated_space: allocated_space
.ok_or("`size` key is required with allocated amount of disk space")?,
read_sector_record_chunks_mode,
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,7 @@ where

let read_piece_fut = match weak_plotted_pieces.upgrade() {
Some(plotted_pieces) => plotted_pieces
.read()
.await
.try_read()?
.read_piece(piece_index)?
.in_current_span(),
None => {
Expand Down
64 changes: 62 additions & 2 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,23 @@ pub struct PieceCacheOffset(pub(crate) u32);
#[async_trait]
pub trait PieceCache: Send + Sync + fmt::Debug {
/// Max number of elements in this cache
fn max_num_elements(&self) -> usize;
fn max_num_elements(&self) -> u32;

/// Contents of this piece cache.
///
/// NOTE: it is possible to do concurrent reads and writes, higher level logic must ensure this
/// doesn't happen for the same piece being accessed!
async fn contents(
&self,
) -> Box<dyn Stream<Item = (PieceCacheOffset, Option<PieceIndex>)> + Unpin + Send + '_>;
) -> Result<
Box<
dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
+ Unpin
+ Send
+ '_,
>,
FarmError,
>;

/// Store piece in cache at specified offset, replacing existing piece if there is any.
///
Expand Down Expand Up @@ -433,3 +441,55 @@ pub trait Farm {
/// Run and wait for background threads to exit or return an error
fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>>;
}

#[async_trait]
impl<T> Farm for Box<T>
where
T: Farm + ?Sized,
{
fn id(&self) -> &FarmId {
self.as_ref().id()
}

fn total_sectors_count(&self) -> SectorIndex {
self.as_ref().total_sectors_count()
}

fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static> {
self.as_ref().plotted_sectors()
}

fn piece_cache(&self) -> Arc<dyn PieceCache + 'static> {
self.as_ref().piece_cache()
}

fn plot_cache(&self) -> Arc<dyn PlotCache + 'static> {
self.as_ref().plot_cache()
}

fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
self.as_ref().piece_reader()
}

fn on_sector_update(
&self,
callback: HandlerFn<(SectorIndex, SectorUpdate)>,
) -> Box<dyn HandlerId> {
self.as_ref().on_sector_update(callback)
}

fn on_farming_notification(
&self,
callback: HandlerFn<FarmingNotification>,
) -> Box<dyn HandlerId> {
self.as_ref().on_farming_notification(callback)
}

fn on_solution(&self, callback: HandlerFn<SolutionResponse>) -> Box<dyn HandlerId> {
self.as_ref().on_solution(callback)
}

fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
(*self).run()
}
}
42 changes: 37 additions & 5 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,42 @@ where
.map(
|(index, ((mut stored_pieces, mut free_offsets), new_cache))| {
run_future_in_dedicated_thread(
move || async {
let mut contents = new_cache.contents().await;
stored_pieces.reserve(new_cache.max_num_elements());

while let Some((offset, maybe_piece_index)) = contents.next().await {
move || async move {
// Hack with first collecting into `Option` with `Option::take()` call
// later is to satisfy compiler that gets confused about ownership
// otherwise
let mut maybe_contents = match new_cache.contents().await {
Ok(contents) => Some(contents),
Err(error) => {
warn!(%index, %error, "Failed to get cache contents");

None
}
};
let Some(mut contents) = maybe_contents.take() else {
drop(maybe_contents);

return PieceCacheState {
stored_pieces,
free_offsets,
backend: new_cache,
};
};

stored_pieces.reserve(new_cache.max_num_elements() as usize);

while let Some(maybe_element_details) = contents.next().await {
let (offset, maybe_piece_index) = match maybe_element_details {
Ok(element_details) => element_details,
Err(error) => {
warn!(
%index,
%error,
"Failed to get cache contents element details"
);
break;
}
};
match maybe_piece_index {
Some(piece_index) => {
stored_pieces.insert(
Expand All @@ -258,6 +289,7 @@ where
yield_now().await;
}

drop(maybe_contents);
drop(contents);

PieceCacheState {
Expand Down
Loading

0 comments on commit a2206ad

Please sign in to comment.