Skip to content

Commit

Permalink
Merge pull request #2912 from subspace/refactor-farmer-metrics
Browse files Browse the repository at this point in the history
Refactor farmer metrics
  • Loading branch information
nazar-pc committed Jul 12, 2024
2 parents ef0ae2e + 05c9ea9 commit 58040d5
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 273 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::commands::shared::metrics::{FarmerMetrics, SectorState};
//! Metrics specific for single disk farm

use crate::commands::shared::DiskFarm;
use anyhow::anyhow;
use async_lock::Mutex as AsyncMutex;
use backoff::ExponentialBackoff;
use bytesize::ByteSize;
use clap::Parser;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, FutureExt, StreamExt, TryStreamExt};
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use prometheus_client::registry::Registry;
use std::fs;
use std::future::Future;
Expand All @@ -21,11 +22,10 @@ use subspace_farmer::cluster::controller::ClusterNodeClient;
use subspace_farmer::cluster::farmer::farmer_service;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::cluster::plotter::ClusterPlotter;
use subspace_farmer::farm::{
Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate,
};
use subspace_farmer::farm::Farm;
use subspace_farmer::node_client::caching_proxy_node_client::CachingProxyNodeClient;
use subspace_farmer::node_client::NodeClient;
use subspace_farmer::single_disk_farm::metrics::SingleDiskFarmMetrics;
use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions,
};
Expand Down Expand Up @@ -186,8 +186,6 @@ where
.await
.map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?;

let farmer_metrics = FarmerMetrics::new(registry);

let kzg = Kzg::new(embedded_kzg_settings());
let erasure_coding = ErasureCoding::new(
NonZeroUsize::new(Record::NUM_S_BUCKETS.next_power_of_two().ilog2() as usize)
Expand Down Expand Up @@ -243,6 +241,7 @@ where
let faster_read_sector_record_chunks_mode_barrier =
Arc::new(Barrier::new(disk_farms.len()));
let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1));
let metrics = &SingleDiskFarmMetrics::new(registry);

let mut farms = Vec::with_capacity(disk_farms.len());
let mut farms_stream = disk_farms
Expand Down Expand Up @@ -284,6 +283,7 @@ where
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
metrics: Some(metrics.clone()),
create,
},
farm_index,
Expand Down Expand Up @@ -354,31 +354,6 @@ where
.collect::<Vec<_>>()
};

let total_and_plotted_sectors = farms
.iter()
.enumerate()
.map(|(farm_index, farm)| async move {
let total_sector_count = farm.total_sectors_count();
let mut plotted_sectors_count = 0;
let plotted_sectors = farm.plotted_sectors();
let mut plotted_sectors = plotted_sectors.get().await.map_err(|error| {
anyhow!("Failed to get plotted sectors for farm {farm_index}: {error}")
})?;
while let Some(plotted_sector_result) = plotted_sectors.next().await {
plotted_sectors_count += 1;
plotted_sector_result.map_err(|error| {
anyhow!(
"Failed reading plotted sector on startup for farm {farm_index}: {error}"
)
})?;
}

anyhow::Ok((total_sector_count, plotted_sectors_count))
})
.collect::<FuturesOrdered<_>>()
.try_collect::<Vec<_>>()
.await?;

let mut farmer_services = (0..service_instances.get())
.map(|index| {
AsyncJoinOnDrop::new(
Expand All @@ -404,93 +379,7 @@ where

let mut farms_stream = (0u8..)
.zip(farms)
.zip(total_and_plotted_sectors)
.map(|((farm_index, farm), sector_counts)| {
let (total_sector_count, plotted_sectors_count) = sector_counts;
farmer_metrics.update_sectors_total(
farm.id(),
total_sector_count - plotted_sectors_count,
SectorState::NotPlotted,
);
farmer_metrics.update_sectors_total(
farm.id(),
plotted_sectors_count,
SectorState::Plotted,
);
farm.on_sector_update(Arc::new({
let farm_id = *farm.id();
let farmer_metrics = farmer_metrics.clone();

move |(_sector_index, sector_state)| match sector_state {
SectorUpdate::Plotting(SectorPlottingDetails::Starting { .. }) => {
farmer_metrics.sector_plotting.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Downloading) => {
farmer_metrics.sector_downloading.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Downloaded(time)) => {
farmer_metrics.observe_sector_downloading_time(&farm_id, time);
farmer_metrics.sector_downloaded.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Encoding) => {
farmer_metrics.sector_encoding.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Encoded(time)) => {
farmer_metrics.observe_sector_encoding_time(&farm_id, time);
farmer_metrics.sector_encoded.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Writing) => {
farmer_metrics.sector_writing.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Written(time)) => {
farmer_metrics.observe_sector_writing_time(&farm_id, time);
farmer_metrics.sector_written.inc();
}
SectorUpdate::Plotting(SectorPlottingDetails::Finished { time, .. }) => {
farmer_metrics.observe_sector_plotting_time(&farm_id, time);
farmer_metrics.sector_plotted.inc();
farmer_metrics.update_sector_state(&farm_id, SectorState::Plotted);
}
SectorUpdate::Plotting(SectorPlottingDetails::Error(_)) => {
farmer_metrics.sector_plotting_error.inc();
}
SectorUpdate::Expiration(SectorExpirationDetails::AboutToExpire) => {
farmer_metrics.update_sector_state(&farm_id, SectorState::AboutToExpire);
}
SectorUpdate::Expiration(SectorExpirationDetails::Expired) => {
farmer_metrics.update_sector_state(&farm_id, SectorState::Expired);
}
SectorUpdate::Expiration(SectorExpirationDetails::Determined { .. }) => {
// Not interested in here
}
}
}))
.detach();

farm.on_farming_notification(Arc::new({
let farm_id = *farm.id();
let farmer_metrics = farmer_metrics.clone();

move |farming_notification| match farming_notification {
FarmingNotification::Auditing(auditing_details) => {
farmer_metrics.observe_auditing_time(&farm_id, &auditing_details.time);
}
FarmingNotification::Proving(proving_details) => {
farmer_metrics.observe_proving_time(
&farm_id,
&proving_details.time,
proving_details.result,
);
}
FarmingNotification::NonFatalError(error) => {
farmer_metrics.note_farming_error(&farm_id, error);
}
}
}))
.detach();

farm.run().map(move |result| (farm_index, result))
})
.map(|(farm_index, farm)| farm.run().map(move |result| (farm_index, result)))
.collect::<FuturesUnordered<_>>();

let mut farm_errors = Vec::new();
Expand Down
Loading

0 comments on commit 58040d5

Please sign in to comment.