From 152a8ec9199b8b407cb0873b1af8d1db27dc621c Mon Sep 17 00:00:00 2001 From: Kirill Mikheev Date: Mon, 1 Jul 2024 16:47:16 +0300 Subject: [PATCH] fix(consensus): metrics --- consensus/src/dag/verifier.rs | 41 +++++------ consensus/src/engine/engine.rs | 70 +++++++++++-------- consensus/src/intercom/core/dispatcher.rs | 6 +- consensus/src/intercom/core/responder.rs | 10 +-- .../src/intercom/dependency/downloader.rs | 26 ++++--- scripts/gen-dashboard.py | 41 ++++++----- 6 files changed, 106 insertions(+), 88 deletions(-) diff --git a/consensus/src/dag/verifier.rs b/consensus/src/dag/verifier.rs index a77369188..539b1b98c 100644 --- a/consensus/src/dag/verifier.rs +++ b/consensus/src/dag/verifier.rs @@ -471,32 +471,33 @@ impl Verifier { } impl ValidateContext { - const VERIFY_DURATION: &'static str = "tycho_mempool_verifier_verify_duration"; - const VALIDATE_DURATION: &'static str = "tycho_mempool_verifier_validate_duration"; + const VERIFY_DURATION: &'static str = "tycho_mempool_verifier_verify_time"; + const VALIDATE_DURATION: &'static str = "tycho_mempool_verifier_validate_time"; + + const ALL_LABELS: [(&'static str, &'static str); 3] = [ + ("kind", "not_exists"), + ("kind", "invalid"), + ("kind", "suspicious"), + ]; fn verified(result: &Result<(), DagPoint>) { - if let Err(dag_point) = result { - Self::meter(dag_point, "tycho_mempool_verifier_verify"); + let (labels, count) = match result { + Err(DagPoint::NotExists(_)) => (&Self::ALL_LABELS[0..=0], 1), + Err(DagPoint::Invalid(_)) => (&Self::ALL_LABELS[1..=1], 1), + Ok(_) => (&Self::ALL_LABELS[0..=1], 0), + _ => unreachable!("unexpected"), }; + metrics::counter!("tycho_mempool_verifier_verify", labels).increment(count); } fn validated(result: DagPoint) -> DagPoint { - Self::meter(&result, "tycho_mempool_verifier_validate"); - result - } - - fn meter(dag_point: &DagPoint, metric_name: &'static str) { - match dag_point { - DagPoint::Trusted(_) => {} - DagPoint::Suspicious(_) => { - metrics::counter!(metric_name, &[("kind", "suspicious")]).increment(1); - } - DagPoint::Invalid(_) => { - metrics::counter!(metric_name, &[("kind", "invalid")]).increment(1); - } - DagPoint::NotExists(_) => { - metrics::counter!(metric_name, &[("kind", "not_exists")]).increment(1); - } + let (labels, count) = match result { + DagPoint::NotExists(_) => (&Self::ALL_LABELS[0..=0], 1), + DagPoint::Invalid(_) => (&Self::ALL_LABELS[1..=1], 1), + DagPoint::Suspicious(_) => (&Self::ALL_LABELS[2..=2], 1), + DagPoint::Trusted(_) => (&Self::ALL_LABELS[..], 0), }; + metrics::counter!("tycho_mempool_verifier_validate", labels).increment(count); + result } } diff --git a/consensus/src/engine/engine.rs b/consensus/src/engine/engine.rs index dfbe21b3d..f692bd131 100644 --- a/consensus/src/engine/engine.rs +++ b/consensus/src/engine/engine.rs @@ -226,8 +226,10 @@ impl Engine { let mut broadcaster = self.broadcaster; let downloader = self.downloader.clone(); async move { - if let Some(own_point) = own_point_fut.await.expect("new point producer") { - EngineContext::own_point_metrics(&own_point); + let own_point = own_point_fut.await.expect("new point producer"); + EngineContext::own_point_metrics(own_point.as_ref()); + + if let Some(own_point) = own_point { let paranoid = Self::expect_own_trusted_point( own_point_round, own_point.clone(), @@ -248,7 +250,6 @@ impl Engine { paranoid.await.expect("verify own produced point"); (broadcaster, Some(new_last_own_point)) } else { - metrics::counter!(EngineContext::PRODUCE_POINT_SKIP).increment(1); collector_signal_rx.close(); bcaster_ready_tx.send(BroadcasterSignal::Ok).ok(); (broadcaster, None) @@ -419,22 +420,44 @@ impl Engine { impl EngineContext { const CURRENT_ROUND: &'static str = "tycho_mempool_engine_current_round"; const ROUNDS_SKIP: &'static str = "tycho_mempool_engine_rounds_skipped"; - const ROUND_DURATION: &'static str = "tycho_mempool_engine_round_duration"; - const PRODUCE_POINT_SKIP: &'static str = "tycho_mempool_engine_produce_skipped"; - const PRODUCE_POINT_DURATION: &'static str = "tycho_mempool_engine_produce_duration"; - const COMMIT_DURATION: &'static str = "tycho_mempool_engine_commit_duration"; + const ROUND_DURATION: &'static str = "tycho_mempool_engine_round_time"; + const PRODUCE_POINT_DURATION: &'static str = "tycho_mempool_engine_produce_time"; + const COMMIT_DURATION: &'static str = "tycho_mempool_engine_commit_time"; + + fn own_point_metrics(own_point: Option<&Point>) { + // refresh counters with zeros every round + metrics::counter!("tycho_mempool_engine_produce_skipped") + .increment(own_point.is_none() as _); + metrics::counter!("tycho_mempool_points_produced").increment(own_point.is_some() as _); + + let proof = own_point.and_then(|point| point.body().proof.as_ref()); + metrics::counter!("tycho_mempool_points_no_proof_produced").increment(proof.is_none() as _); + + metrics::counter!("tycho_mempool_point_payload_count") + .increment(own_point.map_or(0, |point| point.body().payload.len() as _)); + let payload_bytes = own_point.map(|point| { + point + .body() + .payload + .iter() + .fold(0, |acc, bytes| acc + bytes.len()) as _ + }); + metrics::counter!("tycho_mempool_point_payload_bytes") + .increment(payload_bytes.unwrap_or_default()); - fn own_point_metrics(own_point: &Point) { // FIXME all commented metrics needs `gauge.set_max()` or `gauge.set_min()`, // or (better) should be accumulated per round as standalone values - metrics::counter!("tycho_mempool_points_produced").increment(1); - if let Some(_proof) = &own_point.body().proof { - // metrics::gauge!("tycho_mempool_point_evidence_count_min").set_min(proof.evidence.len() as f64); - // metrics::gauge!("tycho_mempool_point_evidence_count_max").set_max(proof.evidence.len() as f64); - } else { - metrics::counter!("tycho_mempool_points_no_proof_produced").increment(1); - } + // let Some(own_point) = own_point else { + // return; + // }; + + // if let Some(_proof) = &own_point.body().proof { + // metrics::gauge!("tycho_mempool_point_evidence_count_min") + // .set_min(proof.evidence.len() as f64); + // metrics::gauge!("tycho_mempool_point_evidence_count_max") + // .set_max(proof.evidence.len() as f64); + // } // metrics::gauge!("tycho_mempool_point_includes_count_min") // .set_min(own_point.body().includes.len() as f64); @@ -447,24 +470,13 @@ impl EngineContext { // .set_max(own_point.body().location.round.0 - own_point.anchor_round(LinkField::Proof).0); // metrics::gauge!("tycho_mempool_point_last_anchor_trigger_rounds_ago") // .set_max(own_point.body().location.round.0 - own_point.anchor_round(LinkField::Trigger).0); - - metrics::counter!("tycho_mempool_point_payload_count") - .increment(own_point.body().payload.len() as _); - metrics::counter!("tycho_mempool_point_payload_bytes").increment( - own_point - .body() - .payload - .iter() - .fold(0, |acc, bytes| acc + bytes.len()) as _, - ); } } impl Effects { fn commit_metrics(&self, committed: &[(Point, Vec)]) { - if !committed.is_empty() { - metrics::counter!("tycho_mempool_commit_anchors").increment(committed.len() as _); - } + metrics::counter!("tycho_mempool_commit_anchors").increment(committed.len() as _); + if let Some((first_anchor, _)) = committed.first() { metrics::gauge!("tycho_mempool_commit_latency_rounds") .set(self.depth(first_anchor.body().location.round)); @@ -477,7 +489,7 @@ impl Effects { } else { Duration::from_millis(anchor_time - now).as_secs_f64().neg() }; - metrics::histogram!("tycho_mempool_commit_anchor_time_latency").record(latency); + metrics::histogram!("tycho_mempool_commit_anchor_latency_time").record(latency); } } diff --git a/consensus/src/intercom/core/dispatcher.rs b/consensus/src/intercom/core/dispatcher.rs index 95b243844..cd3c71170 100644 --- a/consensus/src/intercom/core/dispatcher.rs +++ b/consensus/src/intercom/core/dispatcher.rs @@ -96,9 +96,9 @@ impl QueryKind { } fn metric_name(&self) -> &'static str { match self { - QueryKind::Broadcast(_) => "tycho_mempool_broadcast_query_dispatcher_duration", - QueryKind::Signature(_) => "tycho_mempool_signature_query_dispatcher_duration", - QueryKind::PointById(_) => "tycho_mempool_download_query_dispatcher_duration", + QueryKind::Broadcast(_) => "tycho_mempool_broadcast_query_dispatcher_time", + QueryKind::Signature(_) => "tycho_mempool_signature_query_dispatcher_time", + QueryKind::PointById(_) => "tycho_mempool_download_query_dispatcher_time", } } } diff --git a/consensus/src/intercom/core/responder.rs b/consensus/src/intercom/core/responder.rs index 86f34be83..4859f557f 100644 --- a/consensus/src/intercom/core/responder.rs +++ b/consensus/src/intercom/core/responder.rs @@ -117,18 +117,18 @@ impl Responder { impl EngineContext { fn response_metrics(mp_response: &MPResponse, elapsed: Duration) { let metric_name = match mp_response { - MPResponse::Broadcast => "tycho_mempool_broadcast_query_responder_duration", + MPResponse::Broadcast => "tycho_mempool_broadcast_query_responder_time", MPResponse::Signature(SignatureResponse::NoPoint | SignatureResponse::TryLater) => { - "tycho_mempool_signature_query_responder_pong_duration" + "tycho_mempool_signature_query_responder_pong_time" } MPResponse::Signature( SignatureResponse::Signature(_) | SignatureResponse::Rejected(_), - ) => "tycho_mempool_signature_query_responder_data_duration", + ) => "tycho_mempool_signature_query_responder_data_time", MPResponse::PointById(PointByIdResponse(Some(_))) => { - "tycho_mempool_download_query_responder_some_duration" + "tycho_mempool_download_query_responder_some_time" } MPResponse::PointById(PointByIdResponse(None)) => { - "tycho_mempool_download_query_responder_none_duration" + "tycho_mempool_download_query_responder_none_time" } }; metrics::histogram!(metric_name).record(elapsed); diff --git a/consensus/src/intercom/dependency/downloader.rs b/consensus/src/intercom/dependency/downloader.rs index 9eb23e12b..2e47e435d 100644 --- a/consensus/src/intercom/dependency/downloader.rs +++ b/consensus/src/intercom/dependency/downloader.rs @@ -68,7 +68,7 @@ impl Downloader { effects: Effects, ) -> DagPoint { let _task_duration = HistogramGuard::begin(DownloadContext::TASK_DURATION); - metrics::counter!(DownloadContext::TASK_COUNT).increment(1); + effects.meter_start(&point_id); let span_guard = effects.span().enter(); assert_eq!( point_id.location.round, @@ -131,7 +131,7 @@ impl Downloader { .instrument(effects.span().clone()) .await; - effects.meter(&task); + DownloadContext::meter_task(&task); match downloaded { None => DagPoint::NotExists(Arc::new(point_id)), @@ -426,20 +426,26 @@ impl DownloadTask { } } impl DownloadContext { - const TASK_COUNT: &'static str = "tycho_mempool_download_task_count"; - const TASK_DURATION: &'static str = "tycho_mempool_download_task_duration"; + const TASK_DURATION: &'static str = "tycho_mempool_download_task_time"; const FAILED_QUERY: &'static str = "tycho_mempool_download_query_failed_count"; -} -impl Effects { - fn meter(&self, task: &DownloadTask) { - metrics::gauge!("tycho_mempool_download_depth_rounds") - .set(self.download_max_depth(task.point_id.location.round)); + + fn meter_task(task: &DownloadTask) { metrics::counter!("tycho_mempool_download_not_found_responses") .increment(task.reliably_not_found as _); - metrics::counter!("tycho_mempool_download_aborted_on_exit_count") .increment(task.downloading.len() as _); // metrics::histogram!("tycho_mempool_download_unreliable_responses") // .set(task.unreliable_peers); } } +impl Effects { + fn meter_start(&self, point_id: &PointId) { + metrics::counter!("tycho_mempool_download_task_count").increment(1); + + metrics::counter!(DownloadContext::FAILED_QUERY).increment(0); // refresh + + // FIXME not guaranteed to show the latest value as rounds advance, but better than nothing + metrics::gauge!("tycho_mempool_download_depth_rounds") + .set(self.download_max_depth(point_id.location.round)); + } +} diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index bd4d4cff8..91c575801 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -834,7 +834,7 @@ def mempool() -> RowPanel: ), create_counter_panel( "tycho_mempool_points_produced", - "Engine: produced points (total)", + "Engine: produced points", ), create_counter_panel( "tycho_mempool_points_no_proof_produced", @@ -846,15 +846,15 @@ def mempool() -> RowPanel: "Engine: skipped rounds", ), create_heatmap_panel( - "tycho_mempool_engine_round_duration", + "tycho_mempool_engine_round_time", "Engine: round duration", ), create_counter_panel( "tycho_mempool_engine_produce_skipped", - "Engine: produce point skipped", + "Engine: points to produce skipped", ), create_heatmap_panel( - "tycho_mempool_engine_produce_duration", + "tycho_mempool_engine_produce_time", "Engine: produce point task duration", ), # == Engine commit == # @@ -863,16 +863,15 @@ def mempool() -> RowPanel: "Engine: committed anchors", ), create_heatmap_panel( - "tycho_mempool_engine_commit_duration", + "tycho_mempool_engine_commit_time", "Engine: commit duration", ), - # FIXME next one needs max value over collection period, but no `gauge.set_max()` create_gauge_panel( "tycho_mempool_commit_latency_rounds", - "Engine: committed anchor rounds latency (max over batch) #fixme", + "Engine: committed anchor rounds latency (max over batch)", ), create_heatmap_panel( - "tycho_mempool_commit_anchor_time_latency", + "tycho_mempool_commit_anchor_latency_time", "Engine: committed anchor time latency (min over batch)", ) ] @@ -885,19 +884,19 @@ def mempool_components() -> RowPanel: create_counter_panel( "tycho_mempool_verifier_verify", "Verifier: verify() errors", - labels=['kind=~"kind"'], + labels=['kind=~"$kind"'], ), create_heatmap_panel( - "tycho_mempool_verifier_verify_duration", + "tycho_mempool_verifier_verify_time", "Verifier: verify() point structure and author's sig", ), create_counter_panel( "tycho_mempool_verifier_validate", "Verifier: validate() errors and warnings", - labels=['kind=~"kind"'], + labels=['kind=~"$kind"'], ), create_heatmap_panel( - "tycho_mempool_verifier_validate_duration", + "tycho_mempool_verifier_validate_time", "Verifier: validate() point dependencies in DAG and all-1 sigs" ), # == Download tasks - multiple per round == # @@ -906,7 +905,7 @@ def mempool_components() -> RowPanel: "Downloader: tasks (unique point id)", ), create_heatmap_panel( - "tycho_mempool_download_task_duration", + "tycho_mempool_download_task_time", "Downloader: tasks duration" ), # FIXME next one needs max value over collection period, but no `gauge.set_max()` @@ -928,35 +927,35 @@ def mempool_components() -> RowPanel: ), # == Network tasks - multiple per round == # create_heatmap_panel( - "tycho_mempool_broadcast_query_dispatcher_duration", + "tycho_mempool_broadcast_query_dispatcher_time", "Dispatcher: Broadcast send" ), create_heatmap_panel( - "tycho_mempool_broadcast_query_responder_duration", + "tycho_mempool_broadcast_query_responder_time", "Responder: Broadcast accept" ), create_heatmap_panel( - "tycho_mempool_signature_query_dispatcher_duration", + "tycho_mempool_signature_query_dispatcher_time", "Dispatcher: Signature request" ), create_heatmap_panel( - "tycho_mempool_download_query_dispatcher_duration", + "tycho_mempool_download_query_dispatcher_time", "Dispatcher: Download request" ), create_heatmap_panel( - "tycho_mempool_signature_query_responder_data_duration", + "tycho_mempool_signature_query_responder_data_time", "Responder: Signature send: send ready or sign or reject" ), create_heatmap_panel( - "tycho_mempool_signature_query_responder_pong_duration", + "tycho_mempool_signature_query_responder_pong_time", "Responder: Signature send: no point or try later" ), create_heatmap_panel( - "tycho_mempool_download_query_responder_some_duration", + "tycho_mempool_download_query_responder_some_time", "Responder: Download send: Some(point)" ), create_heatmap_panel( - "tycho_mempool_download_query_responder_none_duration", + "tycho_mempool_download_query_responder_none_time", "Responder: Download send: None" ), ]