diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs index f8ba604361..a44ee180ca 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/farmer.rs @@ -1,12 +1,13 @@ -use crate::commands::shared::metrics::{FarmerMetrics, SectorState}; +//! Metrics specific for single disk farm + use crate::commands::shared::DiskFarm; use anyhow::anyhow; use async_lock::Mutex as AsyncMutex; use backoff::ExponentialBackoff; use bytesize::ByteSize; use clap::Parser; -use futures::stream::{FuturesOrdered, FuturesUnordered}; -use futures::{select, FutureExt, StreamExt, TryStreamExt}; +use futures::stream::FuturesUnordered; +use futures::{select, FutureExt, StreamExt}; use prometheus_client::registry::Registry; use std::fs; use std::future::Future; @@ -21,11 +22,10 @@ use subspace_farmer::cluster::controller::ClusterNodeClient; use subspace_farmer::cluster::farmer::farmer_service; use subspace_farmer::cluster::nats_client::NatsClient; use subspace_farmer::cluster::plotter::ClusterPlotter; -use subspace_farmer::farm::{ - Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, -}; +use subspace_farmer::farm::Farm; use subspace_farmer::node_client::caching_proxy_node_client::CachingProxyNodeClient; use subspace_farmer::node_client::NodeClient; +use subspace_farmer::single_disk_farm::metrics::SingleDiskFarmMetrics; use subspace_farmer::single_disk_farm::{ SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, }; @@ -186,8 +186,6 @@ where .await .map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?; - let farmer_metrics = FarmerMetrics::new(registry); - let kzg = Kzg::new(embedded_kzg_settings()); let erasure_coding = ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) @@ -243,6 +241,7 @@ where let faster_read_sector_record_chunks_mode_barrier = Arc::new(Barrier::new(disk_farms.len())); let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1)); + let metrics = &SingleDiskFarmMetrics::new(registry); let mut farms = Vec::with_capacity(disk_farms.len()); let mut farms_stream = disk_farms @@ -284,6 +283,7 @@ where .read_sector_record_chunks_mode, faster_read_sector_record_chunks_mode_barrier, faster_read_sector_record_chunks_mode_concurrency, + metrics: Some(metrics.clone()), create, }, farm_index, @@ -354,31 +354,6 @@ where .collect::>() }; - let total_and_plotted_sectors = farms - .iter() - .enumerate() - .map(|(farm_index, farm)| async move { - let total_sector_count = farm.total_sectors_count(); - let mut plotted_sectors_count = 0; - let plotted_sectors = farm.plotted_sectors(); - let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { - anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}") - })?; - while let Some(plotted_sector_result) = plotted_sectors.next().await { - plotted_sectors_count += 1; - plotted_sector_result.map_err(|error| { - anyhow!( - "Failed reading plotted sector on startup for farm {farm_index}: {error}" - ) - })?; - } - - anyhow::Ok((total_sector_count, plotted_sectors_count)) - }) - .collect::>() - .try_collect::>() - .await?; - let mut farmer_services = (0..service_instances.get()) .map(|index| { AsyncJoinOnDrop::new( @@ -404,93 +379,7 @@ where let mut farms_stream = (0u8..) .zip(farms) - .zip(total_and_plotted_sectors) - .map(|((farm_index, farm), sector_counts)| { - let (total_sector_count, plotted_sectors_count) = sector_counts; - farmer_metrics.update_sectors_total( - farm.id(), - total_sector_count - plotted_sectors_count, - SectorState::NotPlotted, - ); - farmer_metrics.update_sectors_total( - farm.id(), - plotted_sectors_count, - SectorState::Plotted, - ); - farm.on_sector_update(Arc::new({ - let farm_id = *farm.id(); - let farmer_metrics = farmer_metrics.clone(); - - move |(_sector_index, sector_state)| match sector_state { - SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => { - farmer_metrics.sector_plotting.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => { - farmer_metrics.sector_downloading.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => { - farmer_metrics.observe_sector_downloading_time(&farm_id, time); - farmer_metrics.sector_downloaded.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => { - farmer_metrics.sector_encoding.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => { - farmer_metrics.observe_sector_encoding_time(&farm_id, time); - farmer_metrics.sector_encoded.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Writing) => { - farmer_metrics.sector_writing.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => { - farmer_metrics.observe_sector_writing_time(&farm_id, time); - farmer_metrics.sector_written.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Finished { time, .. }) => { - farmer_metrics.observe_sector_plotting_time(&farm_id, time); - farmer_metrics.sector_plotted.inc(); - farmer_metrics.update_sector_state(&farm_id, SectorState::Plotted); - } - SectorUpdate::Plotting(SectorPlottingDetails::Error(_)) => { - farmer_metrics.sector_plotting_error.inc(); - } - SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { - farmer_metrics.update_sector_state(&farm_id, SectorState::AboutToExpire); - } - SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { - farmer_metrics.update_sector_state(&farm_id, SectorState::Expired); - } - SectorUpdate::Expiration(SectorExpirationDetails::Determined { .. }) => { - // Not interested in here - } - } - })) - .detach(); - - farm.on_farming_notification(Arc::new({ - let farm_id = *farm.id(); - let farmer_metrics = farmer_metrics.clone(); - - move |farming_notification| match farming_notification { - FarmingNotification::Auditing(auditing_details) => { - farmer_metrics.observe_auditing_time(&farm_id, &auditing_details.time); - } - FarmingNotification::Proving(proving_details) => { - farmer_metrics.observe_proving_time( - &farm_id, - &proving_details.time, - proving_details.result, - ); - } - FarmingNotification::NonFatalError(error) => { - farmer_metrics.note_farming_error(&farm_id, error); - } - } - })) - .detach(); - - farm.run().map(move |result| (farm_index, result)) - }) + .map(|(farm_index, farm)| farm.run().map(move |result| (farm_index, result))) .collect::>(); let mut farm_errors = Vec::new(); diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a800fdb348..464c8aa677 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -1,4 +1,3 @@ -use crate::commands::shared::metrics::{FarmerMetrics, SectorState}; use crate::commands::shared::network::{configure_network, NetworkArgs}; use crate::commands::shared::{derive_libp2p_keypair, DiskFarm, PlottingThreadPriority}; use crate::utils::shutdown_signal; @@ -23,10 +22,7 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{PublicKey, Record}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer::farm::plotted_pieces::PlottedPieces; -use subspace_farmer::farm::{ - FarmingNotification, PlottedSectors, SectorExpirationDetails, SectorPlottingDetails, - SectorUpdate, -}; +use subspace_farmer::farm::{PlottedSectors, SectorPlottingDetails, SectorUpdate}; use subspace_farmer::farmer_cache::FarmerCache; use subspace_farmer::farmer_piece_getter::piece_validator::SegmentCommitmentPieceValidator; use subspace_farmer::farmer_piece_getter::{DsnCacheRetryPolicy, FarmerPieceGetter}; @@ -35,6 +31,7 @@ use subspace_farmer::node_client::rpc_node_client::RpcNodeClient; use subspace_farmer::node_client::NodeClient; use subspace_farmer::plotter::cpu::CpuPlotter; use subspace_farmer::single_disk_farm::identity::Identity; +use subspace_farmer::single_disk_farm::metrics::SingleDiskFarmMetrics; use subspace_farmer::single_disk_farm::{ SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions, }; @@ -44,7 +41,6 @@ use subspace_farmer::utils::{ recommended_number_of_farming_threads, run_future_in_dedicated_thread, thread_pool_core_indices, AsyncJoinOnDrop, }; -use subspace_farmer_components::plotting::PlottedSector; use subspace_farmer_components::reading::ReadSectorRecordChunksMode; use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; use subspace_networking::utils::piece_provider::PieceProvider; @@ -347,7 +343,6 @@ where // Metrics let mut prometheus_metrics_registry = Registry::default(); - let farmer_metrics = FarmerMetrics::new(&mut prometheus_metrics_registry); let should_start_prometheus_server = !prometheus_listen_on.is_empty(); let node_client = CachingProxyNodeClient::new(node_client) @@ -374,18 +369,6 @@ where .map_err(|error| anyhow!("Failed to configure networking: {error}"))? }; - let _prometheus_worker = if should_start_prometheus_server { - let prometheus_task = start_prometheus_metrics_server( - prometheus_listen_on, - RegistryAdapter::PrometheusClient(prometheus_metrics_registry), - )?; - - let join_handle = tokio::spawn(prometheus_task); - Some(AsyncJoinOnDrop::new(join_handle, true)) - } else { - None - }; - let kzg = Kzg::new(embedded_kzg_settings()); let erasure_coding = ErasureCoding::new( NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize) @@ -543,6 +526,7 @@ where let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len()) .map(|_| oneshot::channel()) .unzip::<_, _, Vec<_>, Vec<_>>(); + let metrics = &SingleDiskFarmMetrics::new(&mut prometheus_metrics_registry); let mut farms = Vec::with_capacity(disk_farms.len()); let mut farms_stream = disk_farms @@ -584,6 +568,7 @@ where .read_sector_record_chunks_mode, faster_read_sector_record_chunks_mode_barrier, faster_read_sector_record_chunks_mode_concurrency, + metrics: Some(metrics.clone()), create, }, farm_index, @@ -698,8 +683,6 @@ where info!("Collecting already plotted pieces (this will take some time)..."); // Collect already plotted pieces - let mut total_and_plotted_sectors = Vec::with_capacity(farms.len()); - for (farm_index, farm) in farms.iter().enumerate() { let mut plotted_pieces = plotted_pieces.write().await; let farm_index = farm_index.try_into().map_err(|_error| { @@ -711,15 +694,12 @@ where plotted_pieces.add_farm(farm_index, Arc::new(farm.piece_reader())); - let total_sectors_count = farm.total_sectors_count(); - let mut plotted_sectors_count = 0; let plotted_sectors = farm.plotted_sectors(); let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| { anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}") })?; while let Some(plotted_sector_result) = plotted_sectors.next().await { - plotted_sectors_count += 1; plotted_pieces.add_sector( farm_index, &plotted_sector_result.map_err(|error| { @@ -729,119 +709,32 @@ where })?, ) } - - total_and_plotted_sectors.push((total_sectors_count, plotted_sectors_count)); } info!("Finished collecting already plotted pieces successfully"); let mut farms_stream = (0u8..) .zip(farms) - .zip(total_and_plotted_sectors) - .map(|((farm_index, farm), sector_counts)| { + .map(|(farm_index, farm)| { let plotted_pieces = Arc::clone(&plotted_pieces); let span = info_span!("", %farm_index); - // Collect newly plotted pieces - let on_plotted_sector_callback = - move |plotted_sector: &PlottedSector, - maybe_old_plotted_sector: &Option| { + farm.on_sector_update(Arc::new(move |(_sector_index, sector_state)| { + // Collect newly plotted pieces + if let SectorUpdate::Plotting(SectorPlottingDetails::Finished { + plotted_sector, + old_plotted_sector, + time: _, + }) = sector_state + { let _span_guard = span.enter(); - { - let mut plotted_pieces = plotted_pieces.write_blocking(); - - if let Some(old_plotted_sector) = &maybe_old_plotted_sector { - plotted_pieces.delete_sector(farm_index, old_plotted_sector); - } - plotted_pieces.add_sector(farm_index, plotted_sector); - } - }; - - let (total_sector_count, plotted_sectors_count) = sector_counts; - farmer_metrics.update_sectors_total( - farm.id(), - total_sector_count - plotted_sectors_count, - SectorState::NotPlotted, - ); - farmer_metrics.update_sectors_total( - farm.id(), - plotted_sectors_count, - SectorState::Plotted, - ); - farm.on_sector_update(Arc::new({ - let farm_id = *farm.id(); - let farmer_metrics = farmer_metrics.clone(); + let mut plotted_pieces = plotted_pieces.write_blocking(); - move |(_sector_index, sector_state)| match sector_state { - SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => { - farmer_metrics.sector_plotting.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => { - farmer_metrics.sector_downloading.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => { - farmer_metrics.observe_sector_downloading_time(&farm_id, time); - farmer_metrics.sector_downloaded.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => { - farmer_metrics.sector_encoding.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => { - farmer_metrics.observe_sector_encoding_time(&farm_id, time); - farmer_metrics.sector_encoded.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Writing) => { - farmer_metrics.sector_writing.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => { - farmer_metrics.observe_sector_writing_time(&farm_id, time); - farmer_metrics.sector_written.inc(); - } - SectorUpdate::Plotting(SectorPlottingDetails::Finished { - plotted_sector, - old_plotted_sector, - time, - }) => { - on_plotted_sector_callback(plotted_sector, old_plotted_sector); - farmer_metrics.observe_sector_plotting_time(&farm_id, time); - farmer_metrics.sector_plotted.inc(); - farmer_metrics.update_sector_state(&farm_id, SectorState::Plotted); - } - SectorUpdate::Plotting(SectorPlottingDetails::Error(_)) => { - farmer_metrics.sector_plotting_error.inc(); - } - SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { - farmer_metrics.update_sector_state(&farm_id, SectorState::AboutToExpire); - } - SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { - farmer_metrics.update_sector_state(&farm_id, SectorState::Expired); - } - SectorUpdate::Expiration(SectorExpirationDetails::Determined { .. }) => { - // Not interested in here - } - } - })) - .detach(); - - farm.on_farming_notification(Arc::new({ - let farm_id = *farm.id(); - let farmer_metrics = farmer_metrics.clone(); - - move |farming_notification| match farming_notification { - FarmingNotification::Auditing(auditing_details) => { - farmer_metrics.observe_auditing_time(&farm_id, &auditing_details.time); - } - FarmingNotification::Proving(proving_details) => { - farmer_metrics.observe_proving_time( - &farm_id, - &proving_details.time, - proving_details.result, - ); - } - FarmingNotification::NonFatalError(error) => { - farmer_metrics.note_farming_error(&farm_id, error); + if let Some(old_plotted_sector) = &old_plotted_sector { + plotted_pieces.delete_sector(farm_index, old_plotted_sector); } + plotted_pieces.add_sector(farm_index, plotted_sector); } })) .detach(); @@ -854,6 +747,18 @@ where // event handlers drop(plotted_pieces); + let _prometheus_worker = if should_start_prometheus_server { + let prometheus_task = start_prometheus_metrics_server( + prometheus_listen_on, + RegistryAdapter::PrometheusClient(prometheus_metrics_registry), + )?; + + let join_handle = tokio::spawn(prometheus_task); + Some(AsyncJoinOnDrop::new(join_handle, true)) + } else { + None + }; + let mut farm_errors = Vec::new(); let farm_fut = run_future_in_dedicated_thread( diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs index 8cfa727031..4991479320 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs @@ -1,4 +1,3 @@ -pub(super) mod metrics; pub(super) mod network; use bytesize::ByteSize; diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index 6d50e70ce6..f0dc157883 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -6,6 +6,7 @@ pub mod farming; pub mod identity; +pub mod metrics; pub mod piece_cache; pub mod piece_reader; pub mod plot_cache; @@ -17,7 +18,7 @@ pub mod unbuffered_io_file_windows; use crate::disk_piece_cache::{DiskPieceCache, DiskPieceCacheError}; use crate::farm::{ Farm, FarmId, FarmingError, FarmingNotification, HandlerFn, PieceReader, PlottedSectors, - SectorUpdate, + SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, }; use crate::node_client::NodeClient; use crate::plotter::Plotter; @@ -26,6 +27,7 @@ use crate::single_disk_farm::farming::{ farming, slot_notification_forwarder, FarmingOptions, PlotAudit, }; use crate::single_disk_farm::identity::{Identity, IdentityError}; +use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics}; use crate::single_disk_farm::piece_cache::SingleDiskPieceCache; use crate::single_disk_farm::piece_reader::DiskPieceReader; use crate::single_disk_farm::plot_cache::DiskPlotCache; @@ -339,6 +341,8 @@ where pub faster_read_sector_record_chunks_mode_barrier: Arc, /// Limit concurrency of internal benchmarking between different farms pub faster_read_sector_record_chunks_mode_concurrency: Arc, + /// Single disk farm metrics + pub metrics: Option, /// Whether to create a farm if it doesn't yet exist pub create: bool, } @@ -824,6 +828,7 @@ impl SingleDiskFarm { read_sector_record_chunks_mode, faster_read_sector_record_chunks_mode_barrier, faster_read_sector_record_chunks_mode_concurrency, + metrics, .. } = options; @@ -1208,6 +1213,10 @@ impl SingleDiskFarm { _single_disk_farm_info_lock: single_disk_farm_info_lock, }; + if let Some(metrics) = metrics { + farm.register_metrics(metrics); + } + Ok(farm) } @@ -1561,6 +1570,86 @@ impl SingleDiskFarm { }) } + fn register_metrics(&self, metrics: SingleDiskFarmMetrics) { + let farm_id = *self.id(); + + let total_sector_count = self.total_sectors_count; + let plotted_sectors_count = self.sectors_metadata.read_blocking().len() as SectorIndex; + metrics.update_sectors_total( + &farm_id, + total_sector_count - plotted_sectors_count, + SectorState::NotPlotted, + ); + metrics.update_sectors_total(&farm_id, plotted_sectors_count, SectorState::Plotted); + self.on_sector_update(Arc::new({ + let metrics = metrics.clone(); + + move |(_sector_index, sector_state)| match sector_state { + SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => { + metrics.sector_plotting.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => { + metrics.sector_downloading.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => { + metrics.observe_sector_downloading_time(&farm_id, time); + metrics.sector_downloaded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => { + metrics.sector_encoding.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => { + metrics.observe_sector_encoding_time(&farm_id, time); + metrics.sector_encoded.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Writing) => { + metrics.sector_writing.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => { + metrics.observe_sector_writing_time(&farm_id, time); + metrics.sector_written.inc(); + } + SectorUpdate::Plotting(SectorPlottingDetails::Finished { time, .. }) => { + metrics.observe_sector_plotting_time(&farm_id, time); + metrics.sector_plotted.inc(); + metrics.update_sector_state(&farm_id, SectorState::Plotted); + } + SectorUpdate::Plotting(SectorPlottingDetails::Error(_)) => { + metrics.sector_plotting_error.inc(); + } + SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => { + metrics.update_sector_state(&farm_id, SectorState::AboutToExpire); + } + SectorUpdate::Expiration(SectorExpirationDetails::Expired) => { + metrics.update_sector_state(&farm_id, SectorState::Expired); + } + SectorUpdate::Expiration(SectorExpirationDetails::Determined { .. }) => { + // Not interested in here + } + } + })) + .detach(); + + self.on_farming_notification(Arc::new( + move |farming_notification| match farming_notification { + FarmingNotification::Auditing(auditing_details) => { + metrics.observe_auditing_time(&farm_id, &auditing_details.time); + } + FarmingNotification::Proving(proving_details) => { + metrics.observe_proving_time( + &farm_id, + &proving_details.time, + proving_details.result, + ); + } + FarmingNotification::NonFatalError(error) => { + metrics.note_farming_error(&farm_id, error); + } + }, + )) + .detach(); + } + /// Collect summary of single disk farm for presentational purposes pub fn collect_summary(directory: PathBuf) -> SingleDiskFarmSummary { let single_disk_farm_info = match SingleDiskFarmInfo::load_from(&directory) { diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/metrics.rs b/crates/subspace-farmer/src/single_disk_farm/metrics.rs similarity index 87% rename from crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/metrics.rs rename to crates/subspace-farmer/src/single_disk_farm/metrics.rs index 2cc6caa718..020899fd83 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/metrics.rs +++ b/crates/subspace-farmer/src/single_disk_farm/metrics.rs @@ -1,3 +1,6 @@ +//! Metrics for single disk farm + +use crate::farm::{FarmId, FarmingError, ProvingResult}; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prometheus_client::metrics::gauge::Gauge; @@ -7,10 +10,9 @@ use std::fmt; use std::sync::atomic::{AtomicI64, AtomicU64}; use std::time::Duration; use subspace_core_primitives::SectorIndex; -use subspace_farmer::farm::{FarmId, FarmingError, ProvingResult}; #[derive(Debug, Copy, Clone)] -pub(in super::super) enum SectorState { +pub(super) enum SectorState { NotPlotted, Plotted, AboutToExpire, @@ -29,8 +31,9 @@ impl fmt::Display for SectorState { } } +/// Metrics for single disk farm #[derive(Debug, Clone)] -pub(in super::super) struct FarmerMetrics { +pub struct SingleDiskFarmMetrics { auditing_time: Family, Histogram>, proving_time: Family, Histogram>, farming_errors: Family, Counter>, @@ -39,19 +42,21 @@ pub(in super::super) struct FarmerMetrics { sector_writing_time: Family, Histogram>, sector_plotting_time: Family, Histogram>, sectors_total: Family, Gauge>, - pub(in super::super) sector_downloading: Counter, - pub(in super::super) sector_downloaded: Counter, - pub(in super::super) sector_encoding: Counter, - pub(in super::super) sector_encoded: Counter, - pub(in super::super) sector_writing: Counter, - pub(in super::super) sector_written: Counter, - pub(in super::super) sector_plotting: Counter, - pub(in super::super) sector_plotted: Counter, - pub(in super::super) sector_plotting_error: Counter, + pub(super) sector_downloading: Counter, + pub(super) sector_downloaded: Counter, + pub(super) sector_encoding: Counter, + pub(super) sector_encoded: Counter, + pub(super) sector_writing: Counter, + pub(super) sector_written: Counter, + pub(super) sector_plotting: Counter, + pub(super) sector_plotted: Counter, + pub(super) sector_plotting_error: Counter, } -impl FarmerMetrics { - pub(in super::super) fn new(registry: &mut Registry) -> Self { +impl SingleDiskFarmMetrics { + /// Create new instance (note that a single instance must be created and cloned instead of + /// creating multiple separate instances for different farm) + pub fn new(registry: &mut Registry) -> Self { let sub_registry = registry.sub_registry_with_prefix("subspace_farmer"); let auditing_time = Family::<_, _>::new_with_constructor(|| { @@ -239,13 +244,13 @@ impl FarmerMetrics { } } - pub(in super::super) fn observe_auditing_time(&self, farm_id: &FarmId, time: &Duration) { + pub(super) fn observe_auditing_time(&self, farm_id: &FarmId, time: &Duration) { self.auditing_time .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } - pub(in super::super) fn observe_proving_time( + pub(super) fn observe_proving_time( &self, farm_id: &FarmId, time: &Duration, @@ -259,7 +264,7 @@ impl FarmerMetrics { .observe(time.as_secs_f64()); } - pub(in super::super) fn note_farming_error(&self, farm_id: &FarmId, error: &FarmingError) { + pub(super) fn note_farming_error(&self, farm_id: &FarmId, error: &FarmingError) { self.farming_errors .get_or_create(&vec![ ("farm_id".to_string(), farm_id.to_string()), @@ -268,7 +273,7 @@ impl FarmerMetrics { .inc(); } - pub(in super::super) fn update_sectors_total( + pub(super) fn update_sectors_total( &self, farm_id: &FarmId, sectors: SectorIndex, @@ -282,7 +287,7 @@ impl FarmerMetrics { .set(i64::from(sectors)); } - pub(in super::super) fn update_sector_state(&self, farm_id: &FarmId, state: SectorState) { + pub(super) fn update_sector_state(&self, farm_id: &FarmId, state: SectorState) { self.sectors_total .get_or_create(&vec![ ("farm_id".to_string(), farm_id.to_string()), @@ -337,29 +342,25 @@ impl FarmerMetrics { } } - pub(in super::super) fn observe_sector_downloading_time( - &self, - farm_id: &FarmId, - time: &Duration, - ) { + pub(super) fn observe_sector_downloading_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_downloading_time .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } - pub(in super::super) fn observe_sector_encoding_time(&self, farm_id: &FarmId, time: &Duration) { + pub(super) fn observe_sector_encoding_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_encoding_time .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } - pub(in super::super) fn observe_sector_writing_time(&self, farm_id: &FarmId, time: &Duration) { + pub(super) fn observe_sector_writing_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_writing_time .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64()); } - pub(in super::super) fn observe_sector_plotting_time(&self, farm_id: &FarmId, time: &Duration) { + pub(super) fn observe_sector_plotting_time(&self, farm_id: &FarmId, time: &Duration) { self.sector_plotting_time .get_or_create(&vec![("farm_id".to_string(), farm_id.to_string())]) .observe(time.as_secs_f64());