diff --git a/consensus/src/dag/dag.rs b/consensus/src/dag/dag.rs index b2ad4f9eb..d1356858b 100644 --- a/consensus/src/dag/dag.rs +++ b/consensus/src/dag/dag.rs @@ -6,12 +6,11 @@ use std::{array, mem}; use futures_util::FutureExt; use rand::prelude::SliceRandom; use rand::SeedableRng; -use tokio::sync::mpsc::UnboundedSender; use tycho_network::PeerId; use crate::dag::anchor_stage::AnchorStage; use crate::dag::DagRound; -use crate::effects::{AltFormat, CurrentRoundContext, Effects}; +use crate::effects::{AltFormat, Effects, EngineContext}; use crate::engine::MempoolConfig; use crate::intercom::PeerSchedule; use crate::models::{Digest, LinkField, Location, Point, PointId, Round, ValidPoint}; @@ -54,7 +53,7 @@ impl Dag { &mut self, next_round: Round, peer_schedule: &PeerSchedule, - effects: &Effects, + effects: &Effects, ) -> DagRound { let mut top = match self.rounds.last_key_value() { None => unreachable!("DAG cannot be empty if properly initialized"), @@ -92,17 +91,12 @@ impl Dag { } /// result is in historical order - pub fn commit( - &mut self, - next_dag_round: DagRound, - committed: UnboundedSender<(Point, Vec)>, - effects: Effects, - ) { + pub fn commit(&mut self, next_dag_round: DagRound) -> Vec<(Point, Vec)> { // finding the latest trigger must not take long, better try later // than wait long for some DagPoint::NotFound, slowing down whole Engine - let _parent_guard = effects.span().enter(); + let mut ordered = Vec::new(); let Some(latest_trigger) = Self::latest_trigger(&next_dag_round) else { - return; + return ordered; }; let _span = tracing::error_span!( "commit trigger", @@ -113,7 +107,6 @@ impl Dag { .entered(); // when we have a valid trigger, its every point of it's subdag is validated successfully let mut anchor_stack = Self::anchor_stack(&latest_trigger, next_dag_round.clone()); - let mut ordered = Vec::new(); while let Some((anchor, anchor_round)) = anchor_stack.pop() { // Note every next "little anchor candidate that could" must have at least full dag depth // Note if sync is implemented as a second sub-graph - drop up to the last linked in chain @@ -121,14 +114,7 @@ impl Dag { let committed = Self::gather_uncommitted(&anchor.point, anchor_round); ordered.push((anchor.point, committed)); } - - effects.log_committed(&ordered); - - for points in ordered { - committed - .send(points) // not recoverable - .expect("Failed to send anchor commit message tp mpsc channel"); - } + ordered } fn latest_trigger(next_dag_round: &DagRound) -> Option { diff --git a/consensus/src/dag/dag_point_future.rs b/consensus/src/dag/dag_point_future.rs index 058d92836..b50bf4018 100644 --- a/consensus/src/dag/dag_point_future.rs +++ b/consensus/src/dag/dag_point_future.rs @@ -11,7 +11,7 @@ use tycho_util::sync::OnceTake; use crate::dag::dag_location::InclusionState; use crate::dag::{DagRound, Verifier}; -use crate::effects::{CurrentRoundContext, Effects, ValidateContext}; +use crate::effects::{DownloadContext, Effects, EngineContext, ValidateContext}; use crate::intercom::Downloader; use crate::models::{DagPoint, Digest, Location, PointId}; use crate::Point; @@ -60,15 +60,15 @@ impl DagPointFuture { point: &Point, state: &InclusionState, downloader: &Downloader, - effects: &Effects, + effects: &Effects, ) -> Self { let downloader = downloader.clone(); - let span = effects.span().clone(); + let effects = Effects::::new(effects, point); let point_dag_round = point_dag_round.downgrade(); let point = point.clone(); let state = state.clone(); DagPointFuture(DagPointFutureType::Broadcast(Shared::new(JoinTask::new( - Verifier::validate(point, point_dag_round, downloader, span) + Verifier::validate(point, point_dag_round, downloader, effects) .inspect(move |dag_point| state.init(dag_point)), )))) } @@ -82,7 +82,6 @@ impl DagPointFuture { effects: &Effects, ) -> Self { let downloader = downloader.clone(); - let effects = effects.clone(); let state = state.clone(); let point_dag_round = point_dag_round.clone(); let (dependents_tx, dependents_rx) = mpsc::unbounded_channel(); @@ -94,6 +93,7 @@ impl DagPointFuture { }, digest: digest.clone(), }; + let effects = Effects::::new(effects, &point_id); DagPointFuture(DagPointFutureType::Download { task: Shared::new(JoinTask::new( downloader diff --git a/consensus/src/dag/dag_round.rs b/consensus/src/dag/dag_round.rs index 61c37954f..f12ac9e70 100644 --- a/consensus/src/dag/dag_round.rs +++ b/consensus/src/dag/dag_round.rs @@ -10,7 +10,7 @@ use tycho_util::FastDashMap; use crate::dag::anchor_stage::AnchorStage; use crate::dag::dag_location::{DagLocation, InclusionState}; use crate::dag::dag_point_future::DagPointFuture; -use crate::effects::{CurrentRoundContext, Effects, ValidateContext}; +use crate::effects::{Effects, EngineContext, ValidateContext}; use crate::engine::MempoolConfig; use crate::intercom::{Downloader, PeerSchedule}; use crate::models::{DagPoint, Digest, PeerCount, Point, Round, ValidPoint}; @@ -131,7 +131,7 @@ impl DagRound { &self, point: &Point, downloader: &Downloader, - effects: &Effects, + effects: &Effects, ) -> Option> { let _guard = effects.span().enter(); assert_eq!( diff --git a/consensus/src/dag/verifier.rs b/consensus/src/dag/verifier.rs index 198b7d5c2..a77369188 100644 --- a/consensus/src/dag/verifier.rs +++ b/consensus/src/dag/verifier.rs @@ -2,13 +2,14 @@ use std::sync::Arc; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; -use tracing::{Instrument, Span}; +use tracing::Instrument; +use tycho_util::metrics::HistogramGuard; use tycho_util::sync::rayon_run; use crate::dag::anchor_stage::AnchorStage; use crate::dag::dag_point_future::DagPointFuture; use crate::dag::{DagRound, WeakDagRound}; -use crate::effects::{AltFormat, Effects, EffectsContext, ValidateContext}; +use crate::effects::{Effects, ValidateContext}; use crate::engine::MempoolConfig; use crate::intercom::{Downloader, PeerSchedule}; use crate::models::{DagPoint, Link, LinkField, Location, PeerCount, Point, ValidPoint}; @@ -33,14 +34,17 @@ pub struct Verifier; impl Verifier { /// the first and mandatory check of any Point received no matter where from pub fn verify(point: &Point, peer_schedule: &PeerSchedule) -> Result<(), DagPoint> { - if !point.is_integrity_ok() { + let _task_duration = HistogramGuard::begin(ValidateContext::VERIFY_DURATION); + let result = if !point.is_integrity_ok() { Err(DagPoint::NotExists(Arc::new(point.id()))) // cannot use point body } else if !(point.is_well_formed() && Self::is_list_of_signers_ok(point, peer_schedule)) { // point links, etc. will not be used Err(DagPoint::Invalid(point.clone())) } else { Ok(()) - } + }; + ValidateContext::verified(&result); + result } /// must be called iff [`Self::verify`] succeeded @@ -48,9 +52,9 @@ impl Verifier { point: Point, // @ r+0 r_0: WeakDagRound, // r+0 downloader: Downloader, - parent_span: Span, + effects: Effects, ) -> DagPoint { - let effects = Effects::::new(&parent_span, &point); + let _task_duration = HistogramGuard::begin(ValidateContext::VALIDATE_DURATION); let span_guard = effects.span().enter(); // for genesis point it's sufficient to be well-formed and pass integrity check, @@ -62,7 +66,8 @@ impl Verifier { ); let Some(r_0) = r_0.upgrade() else { tracing::info!("cannot (in)validate point, no round in local DAG"); - return DagPoint::Suspicious(ValidPoint::new(point.clone())); + let dag_point = DagPoint::Suspicious(ValidPoint::new(point.clone())); + return ValidateContext::validated(dag_point); }; assert_eq!( point.body().location.round, @@ -90,12 +95,13 @@ impl Verifier { &dependencies, )) { - return DagPoint::Invalid(point.clone()); + return ValidateContext::validated(DagPoint::Invalid(point.clone())); } let Some(r_1) = r_0.prev().upgrade() else { tracing::info!("cannot (in)validate point's 'includes', no round in local DAG"); - return DagPoint::Suspicious(ValidPoint::new(point.clone())); + let dag_point = DagPoint::Suspicious(ValidPoint::new(point.clone())); + return ValidateContext::validated(dag_point); }; Self::gather_deps(&point, &r_1, &downloader, &effects, &dependencies); @@ -120,7 +126,7 @@ impl Verifier { let mut sig_checked = false; let mut deps_checked = None; - loop { + let dag_point = loop { tokio::select! { is_sig_ok = &mut signatures_fut, if !sig_checked => if is_sig_ok { match deps_checked { @@ -140,7 +146,8 @@ impl Verifier { } } } - } + }; + ValidateContext::validated(dag_point) } fn is_self_links_ok( @@ -462,17 +469,34 @@ impl Verifier { true } } -impl EffectsContext for ValidateContext {} - -impl Effects { - fn new(parent_span: &Span, point: &Point) -> Self { - Self::new_child(parent_span, || { - tracing::error_span!( - "validate", - author = display(point.body().location.author.alt()), - round = point.body().location.round.0, - digest = display(point.digest().alt()), - ) - }) + +impl ValidateContext { + const VERIFY_DURATION: &'static str = "tycho_mempool_verifier_verify_duration"; + const VALIDATE_DURATION: &'static str = "tycho_mempool_verifier_validate_duration"; + + fn verified(result: &Result<(), DagPoint>) { + if let Err(dag_point) = result { + Self::meter(dag_point, "tycho_mempool_verifier_verify"); + }; + } + + fn validated(result: DagPoint) -> DagPoint { + Self::meter(&result, "tycho_mempool_verifier_validate"); + result + } + + fn meter(dag_point: &DagPoint, metric_name: &'static str) { + match dag_point { + DagPoint::Trusted(_) => {} + DagPoint::Suspicious(_) => { + metrics::counter!(metric_name, &[("kind", "suspicious")]).increment(1); + } + DagPoint::Invalid(_) => { + metrics::counter!(metric_name, &[("kind", "invalid")]).increment(1); + } + DagPoint::NotExists(_) => { + metrics::counter!(metric_name, &[("kind", "not_exists")]).increment(1); + } + }; } } diff --git a/consensus/src/effects/context.rs b/consensus/src/effects/context.rs index 64ecbd9c1..6b6915396 100644 --- a/consensus/src/effects/context.rs +++ b/consensus/src/effects/context.rs @@ -1,44 +1,165 @@ -use std::marker::PhantomData; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use tracing::Span; -/// All side effects are scoped to their context, then often (but not always) equals to module +use crate::effects::AltFormat; +use crate::models::{Digest, PointId, Round}; +use crate::Point; + +/// All side effects are scoped to their context, that often (but not always) equals to module. pub trait EffectsContext {} #[derive(Clone)] -pub struct Effects { +pub struct Effects { /// generally of `tracing::level::Error` as always visible span: Span, - // TODO metrics scoped by use site (context) - _metrics: PhantomData, + /// Context fields must be private to not affect application logic. + context: CTX, } -impl Effects { - pub fn new_root(span: Span) -> Self { - Self { - span, - _metrics: PhantomData, - } - } +impl Effects { /// forbids to create new effects outside the span tree - pub fn new_child(parent_span: &Span, to_span: F) -> Effects + fn new_child(&self, context: U, to_span: F) -> Effects where F: FnOnce() -> Span, - U: EffectsContext, { Effects:: { - span: parent_span.in_scope(to_span), - _metrics: PhantomData, + span: self.span.in_scope(to_span), + context, } } - pub fn span(&self) -> &'_ Span { + fn ctx(&self) -> &CTX { + &self.context + } + pub fn span(&self) -> &Span { &self.span } } -// used as type parameters in several modules (to create children) -// while implementations are kept private to use site -#[derive(Clone)] -pub struct CurrentRoundContext; +/// Root context for uninterrupted sequence of engine rounds +pub struct ChainedRoundsContext; +impl EffectsContext for ChainedRoundsContext {} +impl Effects { + pub fn new(since: Round) -> Self { + Self { + span: tracing::error_span!("rounds", "since" = since.0), + context: ChainedRoundsContext, + } + } +} + #[derive(Clone)] -pub struct ValidateContext; +pub struct EngineContext { + current_round: Round, + download_max_depth: Arc, +} +impl EffectsContext for EngineContext {} +impl Effects { + pub fn new(parent: &Effects, current_round: Round) -> Self { + let new_context = EngineContext { + current_round, + download_max_depth: Default::default(), + }; + parent.new_child(new_context, || { + tracing::error_span!("round", "current" = current_round.0) + }) + } + pub fn depth(&self, round: Round) -> u32 { + self.context.current_round.0.saturating_sub(round.0) + } +} + +pub struct CollectorContext; +impl EffectsContext for CollectorContext {} + +impl Effects { + pub fn new(parent: &Effects) -> Self { + parent.new_child(CollectorContext, || tracing::error_span!("collector")) + } +} + +pub struct BroadcasterContext; +impl EffectsContext for BroadcasterContext {} + +impl Effects { + pub fn new(parent: &Effects, digest: &Digest) -> Self { + parent.new_child(BroadcasterContext, || { + tracing::error_span!("broadcaster", digest = display(digest.alt())) + }) + } +} + +pub struct DownloadContext { + current_round: Round, + download_max_depth: Arc, +} +impl EffectsContext for DownloadContext {} +impl Effects { + pub fn new(parent: &Effects, point_id: &PointId) -> Self { + parent.new_child(parent.ctx().into(), || { + tracing::error_span!( + "download", + author = display(point_id.location.author.alt()), + round = point_id.location.round.0, + digest = display(point_id.digest.alt()), + ) + }) + } + // per round + pub fn download_max_depth(&self, round: Round) -> u32 { + let depth = self.context.current_round.0.saturating_sub(round.0); + let old = self + .context + .download_max_depth + .fetch_max(depth, Ordering::Relaxed); + depth.max(old) + } +} + +pub struct ValidateContext { + current_round: Round, + download_max_depth: Arc, +} +impl EffectsContext for ValidateContext {} +impl Effects { + pub fn new(parent: &Effects, point: &Point) -> Self + where + CTX: EffectsContext, + for<'a> &'a CTX: Into, + { + parent.new_child(parent.ctx().into(), || { + tracing::error_span!( + "validate", + author = display(point.body().location.author.alt()), + round = point.body().location.round.0, + digest = display(point.digest().alt()), + ) + }) + } +} + +impl From<&EngineContext> for ValidateContext { + fn from(parent: &EngineContext) -> Self { + Self { + current_round: parent.current_round, + download_max_depth: parent.download_max_depth.clone(), + } + } +} +impl From<&DownloadContext> for ValidateContext { + fn from(parent: &DownloadContext) -> Self { + Self { + current_round: parent.current_round, + download_max_depth: parent.download_max_depth.clone(), + } + } +} +impl From<&ValidateContext> for DownloadContext { + fn from(parent: &ValidateContext) -> Self { + Self { + current_round: parent.current_round, + download_max_depth: parent.download_max_depth.clone(), + } + } +} diff --git a/consensus/src/engine/engine.rs b/consensus/src/engine/engine.rs index 43b37ffe1..dfbe21b3d 100644 --- a/consensus/src/engine/engine.rs +++ b/consensus/src/engine/engine.rs @@ -1,22 +1,26 @@ use std::iter; +use std::ops::Neg; use std::sync::Arc; +use std::time::{Duration, Instant}; use everscale_crypto::ed25519::KeyPair; use itertools::Itertools; use tokio::sync::{mpsc, oneshot}; use tokio::task::{JoinError, JoinHandle}; -use tracing::Span; use tycho_network::{DhtClient, OverlayService, PeerId}; +use tycho_util::metrics::HistogramGuard; use crate::dag::{Dag, DagRound, InclusionState, LastOwnPoint, Producer, Verifier, WeakDagRound}; -use crate::effects::{AltFormat, CurrentRoundContext, Effects, EffectsContext}; +use crate::effects::{ + AltFormat, ChainedRoundsContext, CollectorContext, Effects, EngineContext, ValidateContext, +}; use crate::engine::input_buffer::InputBuffer; use crate::engine::MempoolConfig; use crate::intercom::{ BroadcastFilter, Broadcaster, BroadcasterSignal, Collector, Dispatcher, Downloader, PeerSchedule, Responder, }; -use crate::models::{ConsensusRound, Link, Point, Round}; +use crate::models::{ConsensusRound, Link, Point, UnixTime}; use crate::LogFlavor; pub struct Engine { @@ -141,6 +145,7 @@ impl Engine { // may reference own point older than from a last round, as its payload may be not resend let mut last_own_point: Option> = None; loop { + let _round_duration = HistogramGuard::begin(EngineContext::ROUND_DURATION); let (prev_round_ok, current_dag_round, round_effects) = { // treat atomic as lock - do not leak its value or repeat the `get()` let consensus_round = self.consensus_round.get(); @@ -151,6 +156,9 @@ impl Engine { consensus_round.0, top_dag_round.round().0, ); + metrics::counter!(EngineContext::ROUNDS_SKIP) + .increment((consensus_round.0 - top_dag_round.round().0) as _); // safe + // `true` if we collected enough dependencies and (optionally) signatures, // so `next_dag_round` from the previous loop is the current now let prev_round_ok = consensus_round == top_dag_round.round(); @@ -165,18 +173,19 @@ impl Engine { // not greater than new `next_dag_round`, and any future round cannot exist. let round_effects = - Effects::::new(&self.effects, consensus_round); + Effects::::new(&self.effects, consensus_round); (prev_round_ok, top_dag_round, round_effects) } else { self.effects = Effects::::new(consensus_round); let round_effects = - Effects::::new(&self.effects, consensus_round); + Effects::::new(&self.effects, consensus_round); let current_dag_round = self.dag .fill_to_top(consensus_round, &self.peer_schedule, &round_effects); (prev_round_ok, current_dag_round, round_effects) } }; + metrics::gauge!(EngineContext::CURRENT_ROUND).set(current_dag_round.round().0); let next_dag_round = self.dag.fill_to_top( current_dag_round.round().next(), @@ -218,12 +227,13 @@ impl Engine { let downloader = self.downloader.clone(); async move { if let Some(own_point) = own_point_fut.await.expect("new point producer") { + EngineContext::own_point_metrics(&own_point); let paranoid = Self::expect_own_trusted_point( own_point_round, own_point.clone(), peer_schedule.clone(), downloader, - round_effects.span().clone(), + round_effects.clone(), ); let new_last_own_point = broadcaster .run( @@ -238,6 +248,7 @@ impl Engine { paranoid.await.expect("verify own produced point"); (broadcaster, Some(new_last_own_point)) } else { + metrics::counter!(EngineContext::PRODUCE_POINT_SKIP).increment(1); collector_signal_rx.close(); bcaster_ready_tx.send(BroadcasterSignal::Ok).ok(); (broadcaster, None) @@ -248,22 +259,38 @@ impl Engine { let commit_run = tokio::task::spawn_blocking({ let mut dag = self.dag; let next_dag_round = next_dag_round.clone(); - let committed = self.committed.clone(); + let committed_tx = self.committed.clone(); let round_effects = round_effects.clone(); move || { - dag.commit(next_dag_round, committed, round_effects); + let task_start = Instant::now(); + let _guard = round_effects.span().enter(); + + let committed = dag.commit(next_dag_round); + + round_effects.commit_metrics(&committed); + round_effects.log_committed(&committed); + + if !committed.is_empty() { + for points in committed { + committed_tx + .send(points) // not recoverable + .expect("Failed to send anchor commit message tp mpsc channel"); + } + metrics::histogram!(EngineContext::COMMIT_DURATION) + .record(task_start.elapsed()); + } dag } }); let collector_run = tokio::spawn({ let mut collector = self.collector; - let round_effects = round_effects.clone(); + let effects = Effects::::new(&round_effects); let next_dag_round = next_dag_round.clone(); async move { let next_round = collector .run( - round_effects, + effects, next_dag_round, own_point_state_rx, collector_signal_tx, @@ -307,13 +334,14 @@ impl Engine { } fn produce( - round_effects: Effects, + round_effects: Effects, current_dag_round: DagRound, last_own_point: Option>, peer_schedule: PeerSchedule, own_point_state: oneshot::Sender, input_buffer: InputBuffer, ) -> Option { + let task_start_time = Instant::now(); if let Some(own_point) = Producer::new_point(¤t_dag_round, last_own_point.as_deref(), &input_buffer) { @@ -333,6 +361,8 @@ impl Engine { round_effects.span(), ); own_point_state.send(state).ok(); + metrics::histogram!(EngineContext::PRODUCE_POINT_DURATION) + .record(task_start_time.elapsed()); Some(own_point) } else { tracing::info!(parent: round_effects.span(), "will not produce point"); @@ -356,21 +386,26 @@ impl Engine { point: Point, peer_schedule: PeerSchedule, downloader: Downloader, - span: Span, + effects: Effects, ) -> JoinHandle<()> { tokio::spawn(async move { if let Err(dag_point) = Verifier::verify(&point, &peer_schedule) { - let _guard = span.enter(); + let _guard = effects.span().enter(); panic!( "Failed to verify own point: {} {:?}", dag_point.alt(), point.id().alt() ) } - let dag_point = - Verifier::validate(point.clone(), point_round, downloader, span.clone()).await; + let dag_point = Verifier::validate( + point.clone(), + point_round, + downloader, + Effects::::new(&effects, &point), + ) + .await; if dag_point.trusted().is_none() { - let _guard = span.enter(); + let _guard = effects.span().enter(); panic!( "Failed to validate own point: {} {:?}", dag_point.alt(), @@ -381,23 +416,72 @@ impl Engine { } } -struct ChainedRoundsContext; -impl EffectsContext for ChainedRoundsContext {} -impl Effects { - fn new(since: Round) -> Self { - Self::new_root(tracing::error_span!("rounds", "since" = since.0)) +impl EngineContext { + const CURRENT_ROUND: &'static str = "tycho_mempool_engine_current_round"; + const ROUNDS_SKIP: &'static str = "tycho_mempool_engine_rounds_skipped"; + const ROUND_DURATION: &'static str = "tycho_mempool_engine_round_duration"; + const PRODUCE_POINT_SKIP: &'static str = "tycho_mempool_engine_produce_skipped"; + const PRODUCE_POINT_DURATION: &'static str = "tycho_mempool_engine_produce_duration"; + const COMMIT_DURATION: &'static str = "tycho_mempool_engine_commit_duration"; + + fn own_point_metrics(own_point: &Point) { + // FIXME all commented metrics needs `gauge.set_max()` or `gauge.set_min()`, + // or (better) should be accumulated per round as standalone values + metrics::counter!("tycho_mempool_points_produced").increment(1); + + if let Some(_proof) = &own_point.body().proof { + // metrics::gauge!("tycho_mempool_point_evidence_count_min").set_min(proof.evidence.len() as f64); + // metrics::gauge!("tycho_mempool_point_evidence_count_max").set_max(proof.evidence.len() as f64); + } else { + metrics::counter!("tycho_mempool_points_no_proof_produced").increment(1); + } + + // metrics::gauge!("tycho_mempool_point_includes_count_min") + // .set_min(own_point.body().includes.len() as f64); + // metrics::gauge!("tycho_mempool_point_includes_count_max") + // .set_max(own_point.body().includes.len() as f64); + // metrics::gauge!("tycho_mempool_point_witness_count_max") + // .set_max(own_point.body().witness.len() as f64); + // + // metrics::gauge!("tycho_mempool_point_last_anchor_proof_rounds_ago") + // .set_max(own_point.body().location.round.0 - own_point.anchor_round(LinkField::Proof).0); + // metrics::gauge!("tycho_mempool_point_last_anchor_trigger_rounds_ago") + // .set_max(own_point.body().location.round.0 - own_point.anchor_round(LinkField::Trigger).0); + + metrics::counter!("tycho_mempool_point_payload_count") + .increment(own_point.body().payload.len() as _); + metrics::counter!("tycho_mempool_point_payload_bytes").increment( + own_point + .body() + .payload + .iter() + .fold(0, |acc, bytes| acc + bytes.len()) as _, + ); } } -impl EffectsContext for CurrentRoundContext {} -impl Effects { - fn new(parent: &Effects, current: Round) -> Self { - Self::new_child(parent.span(), || { - tracing::error_span!("round", "current" = current.0) - }) +impl Effects { + fn commit_metrics(&self, committed: &[(Point, Vec)]) { + if !committed.is_empty() { + metrics::counter!("tycho_mempool_commit_anchors").increment(committed.len() as _); + } + if let Some((first_anchor, _)) = committed.first() { + metrics::gauge!("tycho_mempool_commit_latency_rounds") + .set(self.depth(first_anchor.body().location.round)); + } + if let Some((last_anchor, _)) = committed.last() { + let now = UnixTime::now().as_u64(); + let anchor_time = last_anchor.body().time.as_u64(); + let latency = if now >= anchor_time { + Duration::from_millis(now - anchor_time).as_secs_f64() + } else { + Duration::from_millis(anchor_time - now).as_secs_f64().neg() + }; + metrics::histogram!("tycho_mempool_commit_anchor_time_latency").record(latency); + } } - pub(crate) fn log_committed(&self, committed: &[(Point, Vec)]) { + fn log_committed(&self, committed: &[(Point, Vec)]) { if !committed.is_empty() && MempoolConfig::LOG_FLAVOR == LogFlavor::Truncated && tracing::enabled!(tracing::Level::DEBUG) diff --git a/consensus/src/intercom/broadcast/broadcast_filter.rs b/consensus/src/intercom/broadcast/broadcast_filter.rs index 34f7b26e8..b94595ac9 100644 --- a/consensus/src/intercom/broadcast/broadcast_filter.rs +++ b/consensus/src/intercom/broadcast/broadcast_filter.rs @@ -10,7 +10,7 @@ use tycho_util::FastDashMap; use super::dto::ConsensusEvent; use crate::dag::{DagRound, Verifier}; use crate::dyn_event; -use crate::effects::{AltFormat, CurrentRoundContext, Effects}; +use crate::effects::{AltFormat, Effects, EngineContext}; use crate::engine::MempoolConfig; use crate::intercom::dto::PeerState; use crate::intercom::{Downloader, PeerSchedule}; @@ -44,7 +44,7 @@ impl BroadcastFilter { point: &Point, top_dag_round: &DagRound, downloader: &Downloader, - effects: &Effects, + effects: &Effects, ) { self.inner .add(sender, point, top_dag_round, downloader, effects); @@ -54,7 +54,7 @@ impl BroadcastFilter { &self, top_dag_round: &DagRound, downloader: &Downloader, - round_effects: &Effects, + round_effects: &Effects, ) { self.inner .advance_round(top_dag_round, downloader, round_effects); @@ -114,7 +114,7 @@ impl BroadcastFilterInner { point: &Point, top_dag_round: &DagRound, downloader: &Downloader, - effects: &Effects, + effects: &Effects, ) { // for any node @ r+0, its DAG always contains [r-DAG_DEPTH-N; r+1] rounds, where N>=2 @@ -243,7 +243,7 @@ impl BroadcastFilterInner { &self, top_dag_round: &DagRound, downloader: &Downloader, - effects: &Effects, + effects: &Effects, ) { // concurrent callers may break historical order of messages in channel // until new_round is channeled, and Collector can cope with it; @@ -301,7 +301,7 @@ impl BroadcastFilterInner { point_round: &DagRound, point: &Point, downloader: &Downloader, - effects: &Effects, + effects: &Effects, ) { // this may be not the first insert into DAG - in case some node received it earlier, // and we've already received its point that references current broadcast diff --git a/consensus/src/intercom/broadcast/broadcaster.rs b/consensus/src/intercom/broadcast/broadcaster.rs index 3d924b5ab..8bf8fa3ec 100644 --- a/consensus/src/intercom/broadcast/broadcaster.rs +++ b/consensus/src/intercom/broadcast/broadcaster.rs @@ -13,11 +13,11 @@ use tycho_util::{FastHashMap, FastHashSet}; use crate::dag::LastOwnPoint; use crate::dyn_event; -use crate::effects::{AltFormat, CurrentRoundContext, Effects, EffectsContext}; +use crate::effects::{AltFormat, BroadcasterContext, Effects, EngineContext}; use crate::intercom::broadcast::collector::CollectorSignal; use crate::intercom::broadcast::utils::QueryResponses; use crate::intercom::dto::{BroadcastResponse, PeerState, SignatureResponse}; -use crate::intercom::{Dispatcher, PeerSchedule}; +use crate::intercom::{Dispatcher, PeerSchedule, QueryKind}; use crate::models::{Digest, PeerCount, Point, Signature}; #[derive(Copy, Clone, Debug)] @@ -42,7 +42,7 @@ impl Broadcaster { } pub async fn run( &mut self, - round_effects: &Effects, + round_effects: &Effects, point: &Point, peer_schedule: &PeerSchedule, bcaster_signal: oneshot::Sender, @@ -115,11 +115,11 @@ struct BroadcasterTask { rejections: FastHashSet, signatures: FastHashMap, - bcast_request: tycho_network::Request, + bcast_request: QueryKind, bcast_current: QueryResponses, bcast_outdated: QueryResponses, - sig_request: tycho_network::Request, + sig_request: QueryKind, sig_peers: FastHashSet, sig_current: FuturesUnordered)>>, } @@ -359,14 +359,3 @@ impl BroadcasterTask { } } } - -struct BroadcasterContext; -impl EffectsContext for BroadcasterContext {} - -impl Effects { - fn new(parent: &Effects, digest: &Digest) -> Self { - Self::new_child(parent.span(), || { - tracing::error_span!("broadcaster", digest = display(digest.alt())) - }) - } -} diff --git a/consensus/src/intercom/broadcast/collector.rs b/consensus/src/intercom/broadcast/collector.rs index c9b722b29..76a7a2e90 100644 --- a/consensus/src/intercom/broadcast/collector.rs +++ b/consensus/src/intercom/broadcast/collector.rs @@ -10,7 +10,7 @@ use tycho_util::FastHashSet; use crate::dag::{DagRound, InclusionState}; use crate::dyn_event; -use crate::effects::{AltFormat, CurrentRoundContext, Effects, EffectsContext}; +use crate::effects::{AltFormat, CollectorContext, Effects}; use crate::engine::MempoolConfig; use crate::intercom::broadcast::dto::ConsensusEvent; use crate::intercom::BroadcasterSignal; @@ -50,13 +50,12 @@ impl Collector { pub async fn run( &mut self, - round_effects: Effects, + effects: Effects, next_dag_round: DagRound, // r+1 own_point_state: oneshot::Receiver, collector_signal: mpsc::UnboundedSender, bcaster_signal: oneshot::Receiver, ) -> Round { - let effects = Effects::::new(&round_effects); let span_guard = effects.span().clone().entered(); let current_dag_round = next_dag_round @@ -375,12 +374,3 @@ impl CollectorTask { ); } } - -struct CollectorContext; -impl EffectsContext for CollectorContext {} - -impl Effects { - fn new(parent: &Effects) -> Self { - Self::new_child(parent.span(), || tracing::error_span!("collector")) - } -} diff --git a/consensus/src/intercom/broadcast/signer.rs b/consensus/src/intercom/broadcast/signer.rs index 958832936..92844369a 100644 --- a/consensus/src/intercom/broadcast/signer.rs +++ b/consensus/src/intercom/broadcast/signer.rs @@ -1,7 +1,7 @@ use tycho_network::PeerId; use crate::dag::DagRound; -use crate::effects::{AltFormat, CurrentRoundContext, Effects}; +use crate::effects::{AltFormat, Effects, EngineContext}; use crate::intercom::dto::{SignatureRejectedReason, SignatureResponse}; use crate::models::Round; use crate::{dyn_event, MempoolConfig}; @@ -12,7 +12,7 @@ impl Signer { round: Round, author: &PeerId, next_dag_round: &DagRound, - effects: &Effects, + effects: &Effects, ) -> SignatureResponse { let response = Self::make_signature_response(round, author, next_dag_round); let level = match response { diff --git a/consensus/src/intercom/core/dispatcher.rs b/consensus/src/intercom/core/dispatcher.rs index ba8bc9f48..95b243844 100644 --- a/consensus/src/intercom/core/dispatcher.rs +++ b/consensus/src/intercom/core/dispatcher.rs @@ -2,6 +2,7 @@ use anyhow::Result; use futures_util::future::BoxFuture; use futures_util::FutureExt; use tycho_network::{DhtClient, Network, OverlayId, OverlayService, PeerId, PrivateOverlay}; +use tycho_util::metrics::HistogramGuard; use crate::intercom::core::dto::{MPQuery, MPResponse}; use crate::intercom::core::responder::Responder; @@ -38,31 +39,33 @@ impl Dispatcher { (this, private_overlay) } - pub fn point_by_id_request(id: &PointId) -> tycho_network::Request { - (&MPQuery::PointById(id.clone())).into() + pub fn broadcast_request(point: &Point) -> QueryKind { + QueryKind::Broadcast((&MPQuery::Broadcast(point.clone())).into()) } - pub fn signature_request(round: Round) -> tycho_network::Request { - (&MPQuery::Signature(round)).into() + pub fn signature_request(round: Round) -> QueryKind { + QueryKind::Signature((&MPQuery::Signature(round)).into()) } - pub fn broadcast_request(point: &Point) -> tycho_network::Request { - (&MPQuery::Broadcast(point.clone())).into() + pub fn point_by_id_request(id: &PointId) -> QueryKind { + QueryKind::PointById((&MPQuery::PointById(id.clone())).into()) } pub fn query( &self, peer_id: &PeerId, - request: &tycho_network::Request, + request: &QueryKind, ) -> BoxFuture<'static, (PeerId, Result)> where T: TryFrom, { let peer_id = *peer_id; - let request = request.clone(); + let metric_name = request.metric_name(); + let request = request.data().clone(); let overlay = self.overlay.clone(); let network = self.network.clone(); async move { + let _task_duration = HistogramGuard::begin(metric_name); overlay .query(&network, &peer_id, request) .map(move |response| { @@ -76,3 +79,26 @@ impl Dispatcher { .boxed() } } + +pub enum QueryKind { + Broadcast(tycho_network::Request), + Signature(tycho_network::Request), + PointById(tycho_network::Request), +} + +impl QueryKind { + fn data(&self) -> &tycho_network::Request { + match self { + QueryKind::Broadcast(data) + | QueryKind::Signature(data) + | QueryKind::PointById(data) => data, + } + } + fn metric_name(&self) -> &'static str { + match self { + QueryKind::Broadcast(_) => "tycho_mempool_broadcast_query_dispatcher_duration", + QueryKind::Signature(_) => "tycho_mempool_signature_query_dispatcher_duration", + QueryKind::PointById(_) => "tycho_mempool_download_query_dispatcher_duration", + } + } +} diff --git a/consensus/src/intercom/core/responder.rs b/consensus/src/intercom/core/responder.rs index 4477085f9..86f34be83 100644 --- a/consensus/src/intercom/core/responder.rs +++ b/consensus/src/intercom/core/responder.rs @@ -1,10 +1,11 @@ use std::sync::Arc; +use std::time::{Duration, Instant}; use arc_swap::ArcSwapOption; use tycho_network::{Response, Service, ServiceRequest}; use crate::dag::DagRound; -use crate::effects::{AltFormat, CurrentRoundContext, Effects}; +use crate::effects::{AltFormat, Effects, EngineContext}; use crate::intercom::broadcast::Signer; use crate::intercom::core::dto::{MPQuery, MPResponse}; use crate::intercom::dto::{PointByIdResponse, SignatureResponse}; @@ -18,7 +19,7 @@ struct ResponderInner { broadcast_filter: BroadcastFilter, top_dag_round: DagRound, downloader: Downloader, - effects: Effects, + effects: Effects, } impl Responder { @@ -27,7 +28,7 @@ impl Responder { broadcast_filter: &BroadcastFilter, top_dag_round: &DagRound, downloader: &Downloader, - round_effects: &Effects, + round_effects: &Effects, ) { broadcast_filter.advance_round(top_dag_round, downloader, round_effects); self.0.store(Some(Arc::new(ResponderInner { @@ -63,6 +64,7 @@ impl Service for Responder { impl Responder { fn handle_query(&self, req: &ServiceRequest) -> Option { + let task_start = Instant::now(); let peer_id = req.metadata.peer_id; let body = MPQuery::try_from(req) .inspect_err(|e| { @@ -74,7 +76,7 @@ impl Responder { }) .ok()?; // malformed request is a reason to ignore it let inner = self.0.load_full(); - let response = match body { + let mp_response = match body { MPQuery::Broadcast(point) => { match inner { None => {} // do nothing: sender has retry loop via signature request @@ -104,7 +106,31 @@ impl Responder { ), }), }; - Some(Response::try_from(&response).expect("should serialize own response")) + let response = Response::try_from(&mp_response).expect("should serialize own response"); + + EngineContext::response_metrics(&mp_response, task_start.elapsed()); + + Some(response) + } +} + +impl EngineContext { + fn response_metrics(mp_response: &MPResponse, elapsed: Duration) { + let metric_name = match mp_response { + MPResponse::Broadcast => "tycho_mempool_broadcast_query_responder_duration", + MPResponse::Signature(SignatureResponse::NoPoint | SignatureResponse::TryLater) => { + "tycho_mempool_signature_query_responder_pong_duration" + } + MPResponse::Signature( + SignatureResponse::Signature(_) | SignatureResponse::Rejected(_), + ) => "tycho_mempool_signature_query_responder_data_duration", + MPResponse::PointById(PointByIdResponse(Some(_))) => { + "tycho_mempool_download_query_responder_some_duration" + } + MPResponse::PointById(PointByIdResponse(None)) => { + "tycho_mempool_download_query_responder_none_duration" + } + }; + metrics::histogram!(metric_name).record(elapsed); } } -// ResponderContext is meaningless without metrics diff --git a/consensus/src/intercom/dependency/downloader.rs b/consensus/src/intercom/dependency/downloader.rs index ec088c377..9eb23e12b 100644 --- a/consensus/src/intercom/dependency/downloader.rs +++ b/consensus/src/intercom/dependency/downloader.rs @@ -9,13 +9,14 @@ use tokio::sync::broadcast::error::RecvError; use tokio::sync::{broadcast, mpsc, oneshot}; use tracing::Instrument; use tycho_network::PeerId; +use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; use crate::dag::{DagRound, Verifier}; -use crate::effects::{AltFormat, Effects, EffectsContext, ValidateContext}; +use crate::effects::{AltFormat, DownloadContext, Effects, ValidateContext}; use crate::engine::MempoolConfig; use crate::intercom::dto::{PeerState, PointByIdResponse}; -use crate::intercom::{Dispatcher, PeerSchedule}; +use crate::intercom::{Dispatcher, PeerSchedule, QueryKind}; use crate::models::{DagPoint, PeerCount, PointId}; use crate::{dyn_event, Point}; @@ -32,7 +33,7 @@ struct DownloaderInner { #[derive(Debug)] struct PeerStatus { state: PeerState, - failed_attempts: usize, + failed_queries: usize, /// `true` for peers that depend on current point, i.e. included it directly; /// requests are made without waiting for next attempt; /// entries are never deleted, because they may be not resolved at the moment of insertion @@ -64,9 +65,10 @@ impl Downloader { point_dag_round_strong: DagRound, dependers: mpsc::UnboundedReceiver, verified_broadcast: oneshot::Receiver, - parent_effects: Effects, + effects: Effects, ) -> DagPoint { - let effects = Effects::::new(&parent_effects, &point_id); + let _task_duration = HistogramGuard::begin(DownloadContext::TASK_DURATION); + metrics::counter!(DownloadContext::TASK_COUNT).increment(1); let span_guard = effects.span().enter(); assert_eq!( point_id.location.round, @@ -87,7 +89,7 @@ impl Downloader { .map(|(peer_id, state)| { (*peer_id, PeerStatus { state: *state, - failed_attempts: 0, + failed_queries: 0, is_depender: peer_id == point_id.location.author, is_in_flight: false, }) @@ -98,7 +100,7 @@ impl Downloader { Entry::Vacant(vacant) if author_state == PeerState::Resolved => { vacant.insert(PeerStatus { state: author_state, - failed_attempts: 0, + failed_queries: 0, is_depender: true, // as author is a depender, its 'NotFound' is not reliable is_in_flight: false, }); @@ -110,7 +112,7 @@ impl Downloader { drop(point_dag_round_strong); drop(span_guard); - let downloaded = DownloadTask { + let mut task = DownloadTask { parent: self.clone(), request: Dispatcher::point_by_id_request(&point_id), point_id: point_id.clone(), @@ -123,10 +125,13 @@ impl Downloader { downloading: FuturesUnordered::new(), attempt: 0, skip_next_attempt: false, - } - .run(verified_broadcast) - .instrument(effects.span().clone()) - .await; + }; + let downloaded = task + .run(verified_broadcast) + .instrument(effects.span().clone()) + .await; + + effects.meter(&task); match downloaded { None => DagPoint::NotExists(Arc::new(point_id)), @@ -140,7 +145,7 @@ impl Downloader { point.clone(), point_dag_round, self.clone(), - effects.span().clone(), + Effects::::new(&effects, &point), ) // this is the only `await` in the task, that resolves the download .await; @@ -166,7 +171,7 @@ impl Downloader { struct DownloadTask { parent: Downloader, - request: tycho_network::Request, + request: QueryKind, point_id: PointId, peer_count: PeerCount, @@ -222,7 +227,7 @@ impl DownloadTask { !state.is_in_flight && state.state == PeerState::Resolved // do not re-download immediately if already requested - && state.failed_attempts == 0 + && state.failed_queries == 0 } _ => false, // either already marked or requested and removed, no panic }; @@ -254,7 +259,7 @@ impl DownloadTask { *peer_id, ( // try every peer, until all are tried the same amount of times - status.failed_attempts, + status.failed_queries, // try mandatory peers before others each loop u8::from(!status.is_depender), // randomise within group @@ -305,7 +310,8 @@ impl DownloadTask { .get_mut(peer_id) .unwrap_or_else(|| panic!("Coding error: peer not in map {}", peer_id.alt())); status.is_in_flight = false; - status.failed_attempts += 1; + status.failed_queries = status.failed_queries.saturating_add(1); + metrics::counter!(DownloadContext::FAILED_QUERY).increment(1); tracing::warn!( peer = display(peer_id.alt()), error = display(network_err), @@ -330,18 +336,19 @@ impl DownloadTask { // if points are persisted in storage - it's a ban; // else - peer evicted this point from its cache, as the point // is at least DAG_DEPTH rounds older than current consensus round - self.unreliable_peers += 1; - self.reliably_not_found += 1; // FIXME remove this line when storage is ready + self.unreliable_peers = self.unreliable_peers.saturating_add(1); + // FIXME remove next line when storage is ready + self.reliably_not_found = self.reliably_not_found.saturating_add(1); tracing::warn!(peer = display(peer_id.alt()), "must have returned"); } else { - self.reliably_not_found += 1; + self.reliably_not_found = self.reliably_not_found.saturating_add(1); tracing::debug!(peer = display(peer_id.alt()), "didn't return"); } None } PointByIdResponse(Some(point)) if point.id() != self.point_id => { // it's a ban - self.unreliable_peers += 1; + self.unreliable_peers = self.unreliable_peers.saturating_add(1); tracing::error!( peer_id = display(peer_id.alt()), author = display(point.body().location.author.alt()), @@ -355,7 +362,7 @@ impl DownloadTask { match Verifier::verify(&point, &self.parent.inner.peer_schedule) { Err(dag_point) => { // reliable peer won't return unverifiable point - self.unreliable_peers += 1; + self.unreliable_peers = self.unreliable_peers.saturating_add(1); assert!( dag_point.valid().is_none(), "Coding error: verify() cannot result into a valid point" @@ -400,7 +407,7 @@ impl DownloadTask { self.undone_peers.entry(peer_id).and_modify(|status| { is_suitable = !status.is_in_flight && status.is_depender - && status.failed_attempts == 0 + && status.failed_queries == 0 && status.state == PeerState::Unknown && new == PeerState::Resolved; status.state = new; @@ -418,17 +425,21 @@ impl DownloadTask { } } } -struct DownloadContext; -impl EffectsContext for DownloadContext {} +impl DownloadContext { + const TASK_COUNT: &'static str = "tycho_mempool_download_task_count"; + const TASK_DURATION: &'static str = "tycho_mempool_download_task_duration"; + const FAILED_QUERY: &'static str = "tycho_mempool_download_query_failed_count"; +} impl Effects { - fn new(parent: &Effects, point_id: &PointId) -> Self { - Self::new_child(parent.span(), || { - tracing::error_span!( - "download", - author = display(point_id.location.author.alt()), - round = point_id.location.round.0, - digest = display(point_id.digest.alt()), - ) - }) + fn meter(&self, task: &DownloadTask) { + metrics::gauge!("tycho_mempool_download_depth_rounds") + .set(self.download_max_depth(task.point_id.location.round)); + metrics::counter!("tycho_mempool_download_not_found_responses") + .increment(task.reliably_not_found as _); + + metrics::counter!("tycho_mempool_download_aborted_on_exit_count") + .increment(task.downloading.len() as _); + // metrics::histogram!("tycho_mempool_download_unreliable_responses") + // .set(task.unreliable_peers); } } diff --git a/consensus/src/intercom/dependency/uploader.rs b/consensus/src/intercom/dependency/uploader.rs index 17a47db4f..a26f6ec29 100644 --- a/consensus/src/intercom/dependency/uploader.rs +++ b/consensus/src/intercom/dependency/uploader.rs @@ -3,7 +3,7 @@ use tycho_network::PeerId; use crate::dag::DagRound; use crate::dyn_event; -use crate::effects::{AltFormat, CurrentRoundContext, Effects}; +use crate::effects::{AltFormat, Effects, EngineContext}; use crate::intercom::dto::PointByIdResponse; use crate::models::PointId; @@ -14,7 +14,7 @@ impl Uploader { peer_id: &PeerId, point_id: &PointId, top_dag_round: &DagRound, - effects: &Effects, + effects: &Effects, ) -> PointByIdResponse { let dag_round = if point_id.location.round > top_dag_round.round() { None diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index c5b017e6c..bd4d4cff8 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -151,7 +151,7 @@ def create_heatmap_quantile_panel( def create_row(name, metrics) -> RowPanel: layout = Layout(name) for i in range(0, len(metrics), 2): - chunk = metrics[i : i + 2] + chunk = metrics[i: i + 2] layout.row(chunk) return layout.row_panel @@ -793,6 +793,176 @@ def collator_do_collate() -> RowPanel: return create_row("Collator Do Collate", metrics) +def mempool() -> RowPanel: + metrics = [ + create_gauge_panel( + "tycho_mempool_last_anchor_round", + "Adapter: last anchor round", + ), + create_gauge_panel( + "tycho_mempool_engine_current_round", + "Engine: current round", + ), + # == Mempool adapter == # + create_counter_panel( + "tycho_mempool_externals_count_total", + "Adapter: unique externals count", + ), + create_counter_panel( + "tycho_mempool_externals_bytes_total", + "Adapter: unique externals size", + unit_format=UNITS.BYTES, + ), + create_counter_panel( + "tycho_mempool_duplicates_count_total", + "Adapter: removed duplicate externals count", + ), + create_counter_panel( + "tycho_mempool_duplicates_bytes_total", + "Adapter: removed duplicate externals size", + unit_format=UNITS.BYTES, + ), + # == Engine own point == # + create_counter_panel( + "tycho_mempool_point_payload_count", + "Engine: points payload count", + ), + create_counter_panel( + "tycho_mempool_point_payload_bytes", + "Engine: points payload size", + unit_format=UNITS.BYTES, + ), + create_counter_panel( + "tycho_mempool_points_produced", + "Engine: produced points (total)", + ), + create_counter_panel( + "tycho_mempool_points_no_proof_produced", + "Engine: produced points without proof", + ), + # == Engine == # + create_counter_panel( + "tycho_mempool_engine_rounds_skipped", + "Engine: skipped rounds", + ), + create_heatmap_panel( + "tycho_mempool_engine_round_duration", + "Engine: round duration", + ), + create_counter_panel( + "tycho_mempool_engine_produce_skipped", + "Engine: produce point skipped", + ), + create_heatmap_panel( + "tycho_mempool_engine_produce_duration", + "Engine: produce point task duration", + ), + # == Engine commit == # + create_counter_panel( + "tycho_mempool_commit_anchors", + "Engine: committed anchors", + ), + create_heatmap_panel( + "tycho_mempool_engine_commit_duration", + "Engine: commit duration", + ), + # FIXME next one needs max value over collection period, but no `gauge.set_max()` + create_gauge_panel( + "tycho_mempool_commit_latency_rounds", + "Engine: committed anchor rounds latency (max over batch) #fixme", + ), + create_heatmap_panel( + "tycho_mempool_commit_anchor_time_latency", + "Engine: committed anchor time latency (min over batch)", + ) + ] + return create_row("Mempool", metrics) + + +def mempool_components() -> RowPanel: + metrics = [ + # == Verifier == # + create_counter_panel( + "tycho_mempool_verifier_verify", + "Verifier: verify() errors", + labels=['kind=~"kind"'], + ), + create_heatmap_panel( + "tycho_mempool_verifier_verify_duration", + "Verifier: verify() point structure and author's sig", + ), + create_counter_panel( + "tycho_mempool_verifier_validate", + "Verifier: validate() errors and warnings", + labels=['kind=~"kind"'], + ), + create_heatmap_panel( + "tycho_mempool_verifier_validate_duration", + "Verifier: validate() point dependencies in DAG and all-1 sigs" + ), + # == Download tasks - multiple per round == # + create_counter_panel( + "tycho_mempool_download_task_count", + "Downloader: tasks (unique point id)", + ), + create_heatmap_panel( + "tycho_mempool_download_task_duration", + "Downloader: tasks duration" + ), + # FIXME next one needs max value over collection period, but no `gauge.set_max()` + create_gauge_panel( + "tycho_mempool_download_depth_rounds", + "Downloader: point depth (max rounds from current) #fixme" + ), + create_counter_panel( + "tycho_mempool_download_not_found_responses", + "Downloader: received None in response" + ), + create_counter_panel( + "tycho_mempool_download_aborted_on_exit_count", + "Downloader: queries aborted (on task completion)", + ), + create_counter_panel( + "tycho_mempool_download_query_failed_count", + "Downloader: queries network error" + ), + # == Network tasks - multiple per round == # + create_heatmap_panel( + "tycho_mempool_broadcast_query_dispatcher_duration", + "Dispatcher: Broadcast send" + ), + create_heatmap_panel( + "tycho_mempool_broadcast_query_responder_duration", + "Responder: Broadcast accept" + ), + create_heatmap_panel( + "tycho_mempool_signature_query_dispatcher_duration", + "Dispatcher: Signature request" + ), + create_heatmap_panel( + "tycho_mempool_download_query_dispatcher_duration", + "Dispatcher: Download request" + ), + create_heatmap_panel( + "tycho_mempool_signature_query_responder_data_duration", + "Responder: Signature send: send ready or sign or reject" + ), + create_heatmap_panel( + "tycho_mempool_signature_query_responder_pong_duration", + "Responder: Signature send: no point or try later" + ), + create_heatmap_panel( + "tycho_mempool_download_query_responder_some_duration", + "Responder: Download send: Some(point)" + ), + create_heatmap_panel( + "tycho_mempool_download_query_responder_none_duration", + "Responder: Download send: None" + ), + ] + return create_row("Mempool components", metrics) + + def collator_execution_manager() -> RowPanel: metrics = [ create_heatmap_panel( @@ -862,6 +1032,8 @@ def templates() -> Templating: collator_do_collate(), collator_finalize_block(), collator_execution_manager(), + mempool(), + mempool_components(), net_conn_manager(), net_request_handler(), net_peer(),