Skip to content

Commit

Permalink
Merge pull request #2920 from subspace/farmer-mertrics-improvements
Browse files Browse the repository at this point in the history
Farmer mertrics improvements
  • Loading branch information
nazar-pc authored Jul 15, 2024
2 parents a40b375 + 7b1d504 commit ba0bc41
Show file tree
Hide file tree
Showing 15 changed files with 628 additions and 343 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ where
)
.await
.map_err(|error| anyhow!("Failed to connect to NATS server: {error}"))?;
let mut registry = Registry::default();
let mut registry = Registry::with_prefix("subspace_farmer");

let mut tasks = FuturesUnordered::new();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ pub(super) async fn controller(
let peer_id = keypair.public().to_peer_id();
let instance = peer_id.to_string();

let (farmer_cache, farmer_cache_worker) = FarmerCache::new(node_client.clone(), peer_id);
let (farmer_cache, farmer_cache_worker) =
FarmerCache::new(node_client.clone(), peer_id, Some(registry));

// TODO: Metrics

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use bytesize::ByteSize;
use clap::Parser;
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::fs;
use std::future::Future;
Expand All @@ -25,7 +26,6 @@ use subspace_farmer::cluster::plotter::ClusterPlotter;
use subspace_farmer::farm::Farm;
use subspace_farmer::node_client::caching_proxy_node_client::CachingProxyNodeClient;
use subspace_farmer::node_client::NodeClient;
use subspace_farmer::single_disk_farm::metrics::SingleDiskFarmMetrics;
use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions,
};
Expand Down Expand Up @@ -241,7 +241,7 @@ where
let faster_read_sector_record_chunks_mode_barrier =
Arc::new(Barrier::new(disk_farms.len()));
let faster_read_sector_record_chunks_mode_concurrency = Arc::new(Semaphore::new(1));
let metrics = &SingleDiskFarmMetrics::new(registry);
let registry = &Mutex::new(registry);

let mut farms = Vec::with_capacity(disk_farms.len());
let mut farms_stream = disk_farms
Expand Down Expand Up @@ -283,7 +283,7 @@ where
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
metrics: Some(metrics.clone()),
registry: Some(registry),
create,
},
farm_index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub(super) struct PlotterArgs {

pub(super) async fn plotter<PosTableLegacy, PosTable>(
nats_client: NatsClient,
_registry: &mut Registry,
registry: &mut Registry,
plotter_args: PlotterArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>>
where
Expand Down Expand Up @@ -169,6 +169,7 @@ where
Arc::clone(&global_mutex),
kzg.clone(),
erasure_coding.clone(),
Some(registry),
));
let modern_cpu_plotter = Arc::new(CpuPlotter::<_, PosTable>::new(
piece_getter.clone(),
Expand All @@ -178,10 +179,9 @@ where
Arc::clone(&global_mutex),
kzg.clone(),
erasure_coding.clone(),
Some(registry),
));

// TODO: Metrics

Ok(Box::pin(async move {
select! {
result = plotter_service(&nats_client, &legacy_cpu_plotter, false).fuse() => {
Expand Down
19 changes: 10 additions & 9 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use subspace_farmer::node_client::rpc_node_client::RpcNodeClient;
use subspace_farmer::node_client::NodeClient;
use subspace_farmer::plotter::cpu::CpuPlotter;
use subspace_farmer::single_disk_farm::identity::Identity;
use subspace_farmer::single_disk_farm::metrics::SingleDiskFarmMetrics;
use subspace_farmer::single_disk_farm::{
SingleDiskFarm, SingleDiskFarmError, SingleDiskFarmOptions,
};
Expand Down Expand Up @@ -339,12 +338,12 @@ where
let keypair = derive_libp2p_keypair(identity.secret_key());
let peer_id = keypair.public().to_peer_id();

let (farmer_cache, farmer_cache_worker) = FarmerCache::new(node_client.clone(), peer_id);

// Metrics
let mut prometheus_metrics_registry = Registry::default();
let mut registry = Registry::with_prefix("subspace_farmer");
let should_start_prometheus_server = !prometheus_listen_on.is_empty();

let (farmer_cache, farmer_cache_worker) =
FarmerCache::new(node_client.clone(), peer_id, Some(&mut registry));

let node_client = CachingProxyNodeClient::new(node_client)
.await
.map_err(|error| anyhow!("Failed to create caching proxy node client: {error}"))?;
Expand All @@ -364,7 +363,7 @@ where
Arc::downgrade(&plotted_pieces),
node_client.clone(),
farmer_cache.clone(),
should_start_prometheus_server.then_some(&mut prometheus_metrics_registry),
should_start_prometheus_server.then_some(&mut registry),
)
.map_err(|error| anyhow!("Failed to configure networking: {error}"))?
};
Expand Down Expand Up @@ -507,6 +506,7 @@ where
Arc::clone(&global_mutex),
kzg.clone(),
erasure_coding.clone(),
Some(&mut registry),
));
let modern_cpu_plotter = Arc::new(CpuPlotter::<_, PosTable>::new(
piece_getter.clone(),
Expand All @@ -516,6 +516,7 @@ where
Arc::clone(&global_mutex),
kzg.clone(),
erasure_coding.clone(),
Some(&mut registry),
));

let (farms, plotting_delay_senders) = {
Expand All @@ -526,7 +527,7 @@ where
let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len())
.map(|_| oneshot::channel())
.unzip::<_, _, Vec<_>, Vec<_>>();
let metrics = &SingleDiskFarmMetrics::new(&mut prometheus_metrics_registry);
let registry = &Mutex::new(&mut registry);

let mut farms = Vec::with_capacity(disk_farms.len());
let mut farms_stream = disk_farms
Expand Down Expand Up @@ -568,7 +569,7 @@ where
.read_sector_record_chunks_mode,
faster_read_sector_record_chunks_mode_barrier,
faster_read_sector_record_chunks_mode_concurrency,
metrics: Some(metrics.clone()),
registry: Some(registry),
create,
},
farm_index,
Expand Down Expand Up @@ -750,7 +751,7 @@ where
let _prometheus_worker = if should_start_prometheus_server {
let prometheus_task = start_prometheus_metrics_server(
prometheus_listen_on,
RegistryAdapter::PrometheusClient(prometheus_metrics_registry),
RegistryAdapter::PrometheusClient(registry),
)?;

let join_handle = tokio::spawn(prometheus_task);
Expand Down
9 changes: 6 additions & 3 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,15 +228,18 @@ pub enum ProvingResult {
/// Managed to prove within time limit, but node rejected solution, likely due to timeout on its
/// end
Rejected,
/// Proving failed altogether
Failed,
}

impl fmt::Display for ProvingResult {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(match self {
ProvingResult::Success => "Success",
ProvingResult::Timeout => "Timeout",
ProvingResult::Rejected => "Rejected",
Self::Success => "Success",
Self::Timeout => "Timeout",
Self::Rejected => "Rejected",
Self::Failed => "Failed",
})
}
}
Expand Down
61 changes: 59 additions & 2 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,19 @@
//! Farmer cache is a container that orchestrates a bunch of piece and plot caches that together
//! persist pieces in a way that is easy to retrieve comparing to decoding pieces from plots.

mod metrics;
#[cfg(test)]
mod tests;

use crate::farm::{MaybePieceStoredResult, PieceCache, PieceCacheId, PieceCacheOffset, PlotCache};
use crate::farmer_cache::metrics::FarmerCacheMetrics;
use crate::node_client::NodeClient;
use crate::utils::run_future_in_dedicated_thread;
use async_lock::RwLock as AsyncRwLock;
use event_listener_primitives::{Bag, HandlerId};
use futures::stream::{FuturesOrdered, FuturesUnordered};
use futures::{select, stream, FutureExt, StreamExt};
use prometheus_client::registry::Registry;
use rayon::prelude::*;
use std::collections::{HashMap, VecDeque};
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -84,6 +87,7 @@ where
plot_caches: Arc<PlotCaches>,
handlers: Arc<Handlers>,
worker_receiver: Option<mpsc::Receiver<WorkerCommand>>,
metrics: Option<Arc<FarmerCacheMetrics>>,
}

impl<NC> FarmerCacheWorker<NC>
Expand Down Expand Up @@ -233,6 +237,10 @@ where

debug!("Collecting pieces that were in the cache before");

if let Some(metrics) = &self.metrics {
metrics.piece_cache_capacity_total.set(0);
metrics.piece_cache_capacity_used.set(0);
}
// Build cache state of all backends
let maybe_caches_futures = stored_pieces
.into_iter()
Expand All @@ -241,6 +249,11 @@ where
.enumerate()
.map(
|(index, ((mut stored_pieces, mut free_offsets), new_cache))| {
if let Some(metrics) = &self.metrics {
metrics
.piece_cache_capacity_total
.inc_by(new_cache.max_num_elements() as i64);
}
run_future_in_dedicated_thread(
move || async move {
// Hack with first collecting into `Option` with `Option::take()` call
Expand Down Expand Up @@ -411,6 +424,14 @@ where
});
});

if let Some(metrics) = &self.metrics {
for cache in &mut caches {
metrics
.piece_cache_capacity_used
.inc_by(cache.stored_pieces.len() as i64);
}
}

// Store whatever correct pieces are immediately available after restart
self.piece_caches.write().await.clone_from(&caches);

Expand Down Expand Up @@ -488,6 +509,9 @@ where
);
return false;
}
if let Some(metrics) = &self.metrics {
metrics.piece_cache_capacity_used.inc();
}
cache
.stored_pieces
.insert(RecordKey::from(piece_index.to_multihash()), offset);
Expand Down Expand Up @@ -798,6 +822,9 @@ where
%offset,
"Successfully stored piece in cache"
);
if let Some(metrics) = &self.metrics {
metrics.piece_cache_capacity_used.inc();
}
cache.stored_pieces.insert(record_key, offset);
}
return;
Expand Down Expand Up @@ -905,14 +932,19 @@ pub struct FarmerCache {
handlers: Arc<Handlers>,
// We do not want to increase capacity unnecessarily on clone
worker_sender: Arc<mpsc::Sender<WorkerCommand>>,
metrics: Option<Arc<FarmerCacheMetrics>>,
}

impl FarmerCache {
/// Create new piece cache instance and corresponding worker.
///
/// NOTE: Returned future is async, but does blocking operations and should be running in
/// dedicated thread.
pub fn new<NC>(node_client: NC, peer_id: PeerId) -> (Self, FarmerCacheWorker<NC>)
pub fn new<NC>(
node_client: NC,
peer_id: PeerId,
registry: Option<&mut Registry>,
) -> (Self, FarmerCacheWorker<NC>)
where
NC: NodeClient,
{
Expand All @@ -924,13 +956,15 @@ impl FarmerCache {
caches: AsyncRwLock::default(),
next_plot_cache: AtomicUsize::new(0),
});
let metrics = registry.map(|registry| Arc::new(FarmerCacheMetrics::new(registry)));

let instance = Self {
peer_id,
piece_caches: Arc::clone(&caches),
plot_caches: Arc::clone(&plot_caches),
handlers: Arc::clone(&handlers),
worker_sender: Arc::new(worker_sender),
metrics: metrics.clone(),
};
let worker = FarmerCacheWorker {
peer_id,
Expand All @@ -939,6 +973,7 @@ impl FarmerCache {
plot_caches,
handlers,
worker_receiver: Some(worker_receiver),
metrics,
};

(instance, worker)
Expand All @@ -952,7 +987,20 @@ impl FarmerCache {
};
match cache.backend.read_piece(offset).await {
Ok(maybe_piece) => {
return maybe_piece.map(|(_piece_index, piece)| piece);
return match maybe_piece {
Some((_piece_index, piece)) => {
if let Some(metrics) = &self.metrics {
metrics.cache_hit.inc();
}
Some(piece)
}
None => {
if let Some(metrics) = &self.metrics {
metrics.cache_miss.inc();
}
None
}
};
}
Err(error) => {
error!(
Expand All @@ -971,17 +1019,26 @@ impl FarmerCache {
trace!(%error, "Failed to send ForgetKey command to worker");
}

if let Some(metrics) = &self.metrics {
metrics.cache_error.inc();
}
return None;
}
}
}

for cache in self.plot_caches.caches.read().await.iter() {
if let Ok(Some(piece)) = cache.read_piece(&key).await {
if let Some(metrics) = &self.metrics {
metrics.cache_hit.inc();
}
return Some(piece);
}
}

if let Some(metrics) = &self.metrics {
metrics.cache_miss.inc();
}
None
}

Expand Down
Loading

0 comments on commit ba0bc41

Please sign in to comment.