Skip to content

Commit

Permalink
Extract slot_notification_forwarder function such that logging can …
Browse files Browse the repository at this point in the history
…be controller for farming in a nicer way
  • Loading branch information
nazar-pc committed Oct 3, 2023
1 parent c957995 commit 3f99ac1
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 22 deletions.
25 changes: 4 additions & 21 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::identity::{Identity, IdentityError};
use crate::node_client::NodeClient;
use crate::reward_signing::reward_signing;
pub use crate::single_disk_farm::farming::FarmingError;
use crate::single_disk_farm::farming::{farming, FarmingOptions};
use crate::single_disk_farm::farming::{farming, slot_notification_forwarder, FarmingOptions};
use crate::single_disk_farm::piece_cache::{DiskPieceCache, DiskPieceCacheError};
use crate::single_disk_farm::piece_reader::PieceReader;
pub use crate::single_disk_farm::plotting::PlottingError;
Expand Down Expand Up @@ -937,32 +937,15 @@ impl SingleDiskFarm {
};
tasks.push(Box::pin(plotting_scheduler(plotting_scheduler_options)));

let (mut slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);
let (slot_info_forwarder_sender, slot_info_forwarder_receiver) = mpsc::channel(0);

tasks.push(Box::pin({
let node_client = node_client.clone();

async move {
info!("Subscribing to slot info notifications");

let mut slot_info_notifications = node_client
.subscribe_slot_info()
slot_notification_forwarder(&node_client, slot_info_forwarder_sender)
.await
.map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;

while let Some(slot_info) = slot_info_notifications.next().await {
debug!(?slot_info, "New slot");

let slot = slot_info.slot_number;

// Error means farmer is still solving for previous slot, which is too late and
// we need to skip this slot
if slot_info_forwarder_sender.try_send(slot_info).is_err() {
debug!(%slot, "Slow farming, skipping slot");
}
}

Ok(())
.map_err(BackgroundTaskError::Farming)
}
}));

Expand Down
31 changes: 30 additions & 1 deletion crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use subspace_farmer_components::{proving, ReadAt};
use subspace_proof_of_space::{Table, TableGenerator};
use subspace_rpc_primitives::{SlotInfo, SolutionResponse};
use thiserror::Error;
use tracing::{debug, error, trace, warn};
use tracing::{debug, error, info, trace, warn};

/// Self-imposed limit for number of solutions that farmer will not go over per challenge.
///
Expand Down Expand Up @@ -65,6 +65,35 @@ pub enum FarmingError {
FailedToCreateThreadPool(#[from] ThreadPoolBuildError),
}

pub(super) async fn slot_notification_forwarder<NC>(
node_client: &NC,
mut slot_info_forwarder_sender: mpsc::Sender<SlotInfo>,
) -> Result<(), FarmingError>
where
NC: NodeClient,
{
info!("Subscribing to slot info notifications");

let mut slot_info_notifications = node_client
.subscribe_slot_info()
.await
.map_err(|error| FarmingError::FailedToSubscribeSlotInfo { error })?;

while let Some(slot_info) = slot_info_notifications.next().await {
debug!(?slot_info, "New slot");

let slot = slot_info.slot_number;

// Error means farmer is still solving for previous slot, which is too late and
// we need to skip this slot
if slot_info_forwarder_sender.try_send(slot_info).is_err() {
debug!(%slot, "Slow farming, skipping slot");
}
}

Ok(())
}

pub(super) struct FarmingOptions<'a, NC> {
pub(super) public_key: PublicKey,
pub(super) reward_address: PublicKey,
Expand Down

0 comments on commit 3f99ac1

Please sign in to comment.