Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use anyhow more widely in farmer code #3128

Merged
merged 1 commit into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ include = [
bench = false

[dependencies]
anyhow = "1.0.89"
async-lock = "3.4.0"
async-trait = "0.1.83"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
Expand Down
16 changes: 3 additions & 13 deletions crates/subspace-farmer-components/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use async_trait::async_trait;
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::error::Error;
use std::fs::File;
use std::future::Future;
use std::io;
Expand All @@ -40,31 +39,22 @@ use subspace_core_primitives::segments::{ArchivedHistorySegment, HistorySize};
#[async_trait]
pub trait PieceGetter {
/// Get piece by index
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>;
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
}

#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
self.as_ref().get_piece(piece_index).await
}
}

#[async_trait]
impl PieceGetter for ArchivedHistorySegment {
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let position = usize::try_from(u64::from(piece_index))?;

Ok(self.pieces().nth(position))
Expand Down
19 changes: 9 additions & 10 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub enum PlottingError {
#[error("Records encoder error: {error}")]
RecordsEncoderError {
/// Lower-level error
error: Box<dyn std::error::Error + Send + Sync + 'static>,
error: anyhow::Error,
},
/// Bad sector output size
#[error("Bad sector output size: provided {provided}, expected {expected}")]
Expand Down Expand Up @@ -97,7 +97,7 @@ pub enum PlottingError {
/// Piece index
piece_index: PieceIndex,
/// Lower-level error
error: Box<dyn std::error::Error + Send + Sync + 'static>,
error: anyhow::Error,
},
/// Failed to acquire permit
#[error("Failed to acquire permit: {error}")]
Expand Down Expand Up @@ -338,7 +338,7 @@ pub trait RecordsEncoder {
sector_id: &SectorId,
records: &mut [Record],
abort_early: &AtomicBool,
) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>>;
) -> anyhow::Result<SectorContentsMap>;
}

/// CPU implementation of [`RecordsEncoder`]
Expand All @@ -361,24 +361,23 @@ where
sector_id: &SectorId,
records: &mut [Record],
abort_early: &AtomicBool,
) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>> {
) -> anyhow::Result<SectorContentsMap> {
if self.erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
return Err(format!(
return Err(anyhow::anyhow!(
"Invalid erasure coding instance: {} shards needed, {} supported",
Record::NUM_S_BUCKETS,
self.erasure_coding.max_shards()
)
.into());
));
}

if self.table_generators.is_empty() {
return Err("No table generators".into());
return Err(anyhow::anyhow!("No table generators"));
}

let pieces_in_sector = records
.len()
.try_into()
.map_err(|error| format!("Failed to convert pieces in sector: {error}"))?;
.map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);

{
Expand Down Expand Up @@ -751,7 +750,7 @@ async fn download_sector_internal<PG: PieceGetter>(
let _permit = match recovery_semaphore.acquire().await {
Ok(permit) => permit,
Err(error) => {
let error = format!("Recovery semaphore was closed: {error}").into();
let error = anyhow::anyhow!("Recovery semaphore was closed: {error}");
return Err(PlottingError::FailedToRetrievePiece { piece_index, error });
}
};
Expand Down
30 changes: 13 additions & 17 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ use crate::cluster::nats_client::{
};
use crate::farm::{PieceCacheId, PieceCacheOffset};
use crate::farmer_cache::FarmerCache;
use crate::node_client::{Error as NodeClientError, NodeClient};
use crate::node_client::NodeClient;
use anyhow::anyhow;
use async_lock::Semaphore;
use async_nats::HeaderValue;
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use std::error::Error;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -171,10 +170,7 @@ pub struct ClusterPieceGetter {

#[async_trait]
impl PieceGetter for ClusterPieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.request_semaphore.acquire().await;

if let Some((piece_cache_id, piece_cache_offset)) = self
Expand Down Expand Up @@ -286,16 +282,17 @@ impl ClusterNodeClient {

#[async_trait]
impl NodeClient for ClusterNodeClient {
async fn farmer_app_info(&self) -> Result<FarmerAppInfo, NodeClientError> {
async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
Ok(self
.nats_client
.request(&ClusterControllerFarmerAppInfoRequest, None)
.await??)
.await?
.map_err(anyhow::Error::msg)?)
}

async fn subscribe_slot_info(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, NodeClientError> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
let subscription = self
.nats_client
.subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
Expand Down Expand Up @@ -328,7 +325,7 @@ impl NodeClient for ClusterNodeClient {
async fn submit_solution_response(
&self,
solution_response: SolutionResponse,
) -> Result<(), NodeClientError> {
) -> anyhow::Result<()> {
let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
Ok(self
.nats_client
Expand All @@ -341,8 +338,7 @@ impl NodeClient for ClusterNodeClient {

async fn subscribe_reward_signing(
&self,
) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, NodeClientError>
{
) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
let subscription = self
.nats_client
.subscribe_to_broadcasts::<ClusterControllerRewardSigningBroadcast>(None, None)
Expand All @@ -356,7 +352,7 @@ impl NodeClient for ClusterNodeClient {
async fn submit_reward_signature(
&self,
reward_signature: RewardSignatureResponse,
) -> Result<(), NodeClientError> {
) -> anyhow::Result<()> {
Ok(self
.nats_client
.notification(
Expand All @@ -368,7 +364,7 @@ impl NodeClient for ClusterNodeClient {

async fn subscribe_archived_segment_headers(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, NodeClientError> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
let subscription = self
.nats_client
.subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
Expand Down Expand Up @@ -401,7 +397,7 @@ impl NodeClient for ClusterNodeClient {
async fn segment_headers(
&self,
segment_indices: Vec<SegmentIndex>,
) -> Result<Vec<Option<SegmentHeader>>, NodeClientError> {
) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
Ok(self
.nats_client
.request(
Expand All @@ -411,7 +407,7 @@ impl NodeClient for ClusterNodeClient {
.await?)
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, NodeClientError> {
async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
Ok(self
.nats_client
.request(&ClusterControllerPieceRequest { piece_index }, None)
Expand All @@ -421,7 +417,7 @@ impl NodeClient for ClusterNodeClient {
async fn acknowledge_archived_segment_header(
&self,
_segment_index: SegmentIndex,
) -> Result<(), NodeClientError> {
) -> anyhow::Result<()> {
// Acknowledgement is unnecessary/unsupported
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
//! way). This crate provides a few of such implementations, but more can be created externally as
//! well if needed without modifying the library itself.

use crate::node_client;
use async_trait::async_trait;
use derive_more::{Display, From};
use futures::Stream;
Expand Down Expand Up @@ -278,13 +277,13 @@ pub enum FarmingError {
#[error("Failed to subscribe to slot info notifications: {error}")]
FailedToSubscribeSlotInfo {
/// Lower-level error
error: node_client::Error,
error: anyhow::Error,
},
/// Failed to retrieve farmer info
#[error("Failed to retrieve farmer info: {error}")]
FailedToGetFarmerInfo {
/// Lower-level error
error: node_client::Error,
error: anyhow::Error,
},
/// Slot info notification stream ended
#[error("Slot info notification stream ended")]
Expand Down
25 changes: 11 additions & 14 deletions crates/subspace-farmer/src/farmer_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use crate::disk_piece_cache::DiskPieceCache;
use crate::farmer_cache::{decode_piece_index_from_record_key, FarmerCache};
use crate::node_client::{Error, NodeClient};
use crate::node_client::NodeClient;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, Stream, StreamExt};
Expand Down Expand Up @@ -44,7 +44,7 @@ struct MockNodeClient {

#[async_trait]
impl NodeClient for MockNodeClient {
async fn farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
// Most of these values make no sense, but they are not used by piece cache anyway
Ok(FarmerAppInfo {
genesis_hash: [0; 32],
Expand All @@ -68,33 +68,33 @@ impl NodeClient for MockNodeClient {

async fn subscribe_slot_info(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, Error> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
unimplemented!()
}

async fn submit_solution_response(
&self,
_solution_response: SolutionResponse,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
unimplemented!()
}

async fn subscribe_reward_signing(
&self,
) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, Error> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
unimplemented!()
}

async fn submit_reward_signature(
&self,
_reward_signature: RewardSignatureResponse,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
unimplemented!()
}

async fn subscribe_archived_segment_headers(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, Error> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
let (tx, rx) = oneshot::channel();
self.archived_segment_headers_stream_request_sender
.clone()
Expand All @@ -109,11 +109,11 @@ impl NodeClient for MockNodeClient {
async fn segment_headers(
&self,
_segment_indexes: Vec<SegmentIndex>,
) -> Result<Vec<Option<SegmentHeader>>, Error> {
) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
unimplemented!()
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
Ok(Some(
self.pieces
.lock()
Expand All @@ -130,7 +130,7 @@ impl NodeClient for MockNodeClient {
async fn acknowledge_archived_segment_header(
&self,
segment_index: SegmentIndex,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
self.acknowledge_archived_segment_header_sender
.clone()
.send(segment_index)
Expand All @@ -147,10 +147,7 @@ struct MockPieceGetter {

#[async_trait]
impl PieceGetter for MockPieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn std::error::Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
Ok(Some(
self.pieces
.lock()
Expand Down
11 changes: 2 additions & 9 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use backoff::future::retry;
use backoff::ExponentialBackoff;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::hash::Hash;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -383,10 +382,7 @@ where
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
Expand Down Expand Up @@ -453,10 +449,7 @@ where
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let Some(piece_getter) = self.upgrade() else {
debug!("Farmer piece getter upgrade didn't succeed");
return Ok(None);
Expand Down
Loading
Loading