Skip to content

Commit

Permalink
fix(consensus): metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Jul 1, 2024
1 parent 8cae58a commit 152a8ec
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 88 deletions.
41 changes: 21 additions & 20 deletions consensus/src/dag/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,32 +471,33 @@ impl Verifier {
}

impl ValidateContext {
const VERIFY_DURATION: &'static str = "tycho_mempool_verifier_verify_duration";
const VALIDATE_DURATION: &'static str = "tycho_mempool_verifier_validate_duration";
const VERIFY_DURATION: &'static str = "tycho_mempool_verifier_verify_time";
const VALIDATE_DURATION: &'static str = "tycho_mempool_verifier_validate_time";

const ALL_LABELS: [(&'static str, &'static str); 3] = [
("kind", "not_exists"),
("kind", "invalid"),
("kind", "suspicious"),
];

fn verified(result: &Result<(), DagPoint>) {
if let Err(dag_point) = result {
Self::meter(dag_point, "tycho_mempool_verifier_verify");
let (labels, count) = match result {
Err(DagPoint::NotExists(_)) => (&Self::ALL_LABELS[0..=0], 1),
Err(DagPoint::Invalid(_)) => (&Self::ALL_LABELS[1..=1], 1),
Ok(_) => (&Self::ALL_LABELS[0..=1], 0),
_ => unreachable!("unexpected"),
};
metrics::counter!("tycho_mempool_verifier_verify", labels).increment(count);
}

fn validated(result: DagPoint) -> DagPoint {
Self::meter(&result, "tycho_mempool_verifier_validate");
result
}

fn meter(dag_point: &DagPoint, metric_name: &'static str) {
match dag_point {
DagPoint::Trusted(_) => {}
DagPoint::Suspicious(_) => {
metrics::counter!(metric_name, &[("kind", "suspicious")]).increment(1);
}
DagPoint::Invalid(_) => {
metrics::counter!(metric_name, &[("kind", "invalid")]).increment(1);
}
DagPoint::NotExists(_) => {
metrics::counter!(metric_name, &[("kind", "not_exists")]).increment(1);
}
let (labels, count) = match result {
DagPoint::NotExists(_) => (&Self::ALL_LABELS[0..=0], 1),
DagPoint::Invalid(_) => (&Self::ALL_LABELS[1..=1], 1),
DagPoint::Suspicious(_) => (&Self::ALL_LABELS[2..=2], 1),
DagPoint::Trusted(_) => (&Self::ALL_LABELS[..], 0),
};
metrics::counter!("tycho_mempool_verifier_validate", labels).increment(count);
result
}
}
70 changes: 41 additions & 29 deletions consensus/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,10 @@ impl Engine {
let mut broadcaster = self.broadcaster;
let downloader = self.downloader.clone();
async move {
if let Some(own_point) = own_point_fut.await.expect("new point producer") {
EngineContext::own_point_metrics(&own_point);
let own_point = own_point_fut.await.expect("new point producer");
EngineContext::own_point_metrics(own_point.as_ref());

if let Some(own_point) = own_point {
let paranoid = Self::expect_own_trusted_point(
own_point_round,
own_point.clone(),
Expand All @@ -248,7 +250,6 @@ impl Engine {
paranoid.await.expect("verify own produced point");
(broadcaster, Some(new_last_own_point))
} else {
metrics::counter!(EngineContext::PRODUCE_POINT_SKIP).increment(1);
collector_signal_rx.close();
bcaster_ready_tx.send(BroadcasterSignal::Ok).ok();
(broadcaster, None)
Expand Down Expand Up @@ -419,22 +420,44 @@ impl Engine {
impl EngineContext {
const CURRENT_ROUND: &'static str = "tycho_mempool_engine_current_round";
const ROUNDS_SKIP: &'static str = "tycho_mempool_engine_rounds_skipped";
const ROUND_DURATION: &'static str = "tycho_mempool_engine_round_duration";
const PRODUCE_POINT_SKIP: &'static str = "tycho_mempool_engine_produce_skipped";
const PRODUCE_POINT_DURATION: &'static str = "tycho_mempool_engine_produce_duration";
const COMMIT_DURATION: &'static str = "tycho_mempool_engine_commit_duration";
const ROUND_DURATION: &'static str = "tycho_mempool_engine_round_time";
const PRODUCE_POINT_DURATION: &'static str = "tycho_mempool_engine_produce_time";
const COMMIT_DURATION: &'static str = "tycho_mempool_engine_commit_time";

fn own_point_metrics(own_point: Option<&Point>) {
// refresh counters with zeros every round
metrics::counter!("tycho_mempool_engine_produce_skipped")
.increment(own_point.is_none() as _);
metrics::counter!("tycho_mempool_points_produced").increment(own_point.is_some() as _);

let proof = own_point.and_then(|point| point.body().proof.as_ref());
metrics::counter!("tycho_mempool_points_no_proof_produced").increment(proof.is_none() as _);

metrics::counter!("tycho_mempool_point_payload_count")
.increment(own_point.map_or(0, |point| point.body().payload.len() as _));
let payload_bytes = own_point.map(|point| {
point
.body()
.payload
.iter()
.fold(0, |acc, bytes| acc + bytes.len()) as _
});
metrics::counter!("tycho_mempool_point_payload_bytes")
.increment(payload_bytes.unwrap_or_default());

fn own_point_metrics(own_point: &Point) {
// FIXME all commented metrics needs `gauge.set_max()` or `gauge.set_min()`,
// or (better) should be accumulated per round as standalone values
metrics::counter!("tycho_mempool_points_produced").increment(1);

if let Some(_proof) = &own_point.body().proof {
// metrics::gauge!("tycho_mempool_point_evidence_count_min").set_min(proof.evidence.len() as f64);
// metrics::gauge!("tycho_mempool_point_evidence_count_max").set_max(proof.evidence.len() as f64);
} else {
metrics::counter!("tycho_mempool_points_no_proof_produced").increment(1);
}
// let Some(own_point) = own_point else {
// return;
// };

// if let Some(_proof) = &own_point.body().proof {
// metrics::gauge!("tycho_mempool_point_evidence_count_min")
// .set_min(proof.evidence.len() as f64);
// metrics::gauge!("tycho_mempool_point_evidence_count_max")
// .set_max(proof.evidence.len() as f64);
// }

// metrics::gauge!("tycho_mempool_point_includes_count_min")
// .set_min(own_point.body().includes.len() as f64);
Expand All @@ -447,24 +470,13 @@ impl EngineContext {
// .set_max(own_point.body().location.round.0 - own_point.anchor_round(LinkField::Proof).0);
// metrics::gauge!("tycho_mempool_point_last_anchor_trigger_rounds_ago")
// .set_max(own_point.body().location.round.0 - own_point.anchor_round(LinkField::Trigger).0);

metrics::counter!("tycho_mempool_point_payload_count")
.increment(own_point.body().payload.len() as _);
metrics::counter!("tycho_mempool_point_payload_bytes").increment(
own_point
.body()
.payload
.iter()
.fold(0, |acc, bytes| acc + bytes.len()) as _,
);
}
}

impl Effects<EngineContext> {
fn commit_metrics(&self, committed: &[(Point, Vec<Point>)]) {
if !committed.is_empty() {
metrics::counter!("tycho_mempool_commit_anchors").increment(committed.len() as _);
}
metrics::counter!("tycho_mempool_commit_anchors").increment(committed.len() as _);

if let Some((first_anchor, _)) = committed.first() {
metrics::gauge!("tycho_mempool_commit_latency_rounds")
.set(self.depth(first_anchor.body().location.round));
Expand All @@ -477,7 +489,7 @@ impl Effects<EngineContext> {
} else {
Duration::from_millis(anchor_time - now).as_secs_f64().neg()
};
metrics::histogram!("tycho_mempool_commit_anchor_time_latency").record(latency);
metrics::histogram!("tycho_mempool_commit_anchor_latency_time").record(latency);
}
}

Expand Down
6 changes: 3 additions & 3 deletions consensus/src/intercom/core/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ impl QueryKind {
}
fn metric_name(&self) -> &'static str {
match self {
QueryKind::Broadcast(_) => "tycho_mempool_broadcast_query_dispatcher_duration",
QueryKind::Signature(_) => "tycho_mempool_signature_query_dispatcher_duration",
QueryKind::PointById(_) => "tycho_mempool_download_query_dispatcher_duration",
QueryKind::Broadcast(_) => "tycho_mempool_broadcast_query_dispatcher_time",
QueryKind::Signature(_) => "tycho_mempool_signature_query_dispatcher_time",
QueryKind::PointById(_) => "tycho_mempool_download_query_dispatcher_time",
}
}
}
10 changes: 5 additions & 5 deletions consensus/src/intercom/core/responder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,18 @@ impl Responder {
impl EngineContext {
fn response_metrics(mp_response: &MPResponse, elapsed: Duration) {
let metric_name = match mp_response {
MPResponse::Broadcast => "tycho_mempool_broadcast_query_responder_duration",
MPResponse::Broadcast => "tycho_mempool_broadcast_query_responder_time",
MPResponse::Signature(SignatureResponse::NoPoint | SignatureResponse::TryLater) => {
"tycho_mempool_signature_query_responder_pong_duration"
"tycho_mempool_signature_query_responder_pong_time"
}
MPResponse::Signature(
SignatureResponse::Signature(_) | SignatureResponse::Rejected(_),
) => "tycho_mempool_signature_query_responder_data_duration",
) => "tycho_mempool_signature_query_responder_data_time",
MPResponse::PointById(PointByIdResponse(Some(_))) => {
"tycho_mempool_download_query_responder_some_duration"
"tycho_mempool_download_query_responder_some_time"
}
MPResponse::PointById(PointByIdResponse(None)) => {
"tycho_mempool_download_query_responder_none_duration"
"tycho_mempool_download_query_responder_none_time"
}
};
metrics::histogram!(metric_name).record(elapsed);
Expand Down
26 changes: 16 additions & 10 deletions consensus/src/intercom/dependency/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Downloader {
effects: Effects<DownloadContext>,
) -> DagPoint {
let _task_duration = HistogramGuard::begin(DownloadContext::TASK_DURATION);
metrics::counter!(DownloadContext::TASK_COUNT).increment(1);
effects.meter_start(&point_id);
let span_guard = effects.span().enter();
assert_eq!(
point_id.location.round,
Expand Down Expand Up @@ -131,7 +131,7 @@ impl Downloader {
.instrument(effects.span().clone())
.await;

effects.meter(&task);
DownloadContext::meter_task(&task);

match downloaded {
None => DagPoint::NotExists(Arc::new(point_id)),
Expand Down Expand Up @@ -426,20 +426,26 @@ impl DownloadTask {
}
}
impl DownloadContext {
const TASK_COUNT: &'static str = "tycho_mempool_download_task_count";
const TASK_DURATION: &'static str = "tycho_mempool_download_task_duration";
const TASK_DURATION: &'static str = "tycho_mempool_download_task_time";
const FAILED_QUERY: &'static str = "tycho_mempool_download_query_failed_count";
}
impl Effects<DownloadContext> {
fn meter(&self, task: &DownloadTask) {
metrics::gauge!("tycho_mempool_download_depth_rounds")
.set(self.download_max_depth(task.point_id.location.round));

fn meter_task(task: &DownloadTask) {
metrics::counter!("tycho_mempool_download_not_found_responses")
.increment(task.reliably_not_found as _);

metrics::counter!("tycho_mempool_download_aborted_on_exit_count")
.increment(task.downloading.len() as _);
// metrics::histogram!("tycho_mempool_download_unreliable_responses")
// .set(task.unreliable_peers);
}
}
impl Effects<DownloadContext> {
fn meter_start(&self, point_id: &PointId) {
metrics::counter!("tycho_mempool_download_task_count").increment(1);

metrics::counter!(DownloadContext::FAILED_QUERY).increment(0); // refresh

// FIXME not guaranteed to show the latest value as rounds advance, but better than nothing
metrics::gauge!("tycho_mempool_download_depth_rounds")
.set(self.download_max_depth(point_id.location.round));
}
}
41 changes: 20 additions & 21 deletions scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ def mempool() -> RowPanel:
),
create_counter_panel(
"tycho_mempool_points_produced",
"Engine: produced points (total)",
"Engine: produced points",
),
create_counter_panel(
"tycho_mempool_points_no_proof_produced",
Expand All @@ -846,15 +846,15 @@ def mempool() -> RowPanel:
"Engine: skipped rounds",
),
create_heatmap_panel(
"tycho_mempool_engine_round_duration",
"tycho_mempool_engine_round_time",
"Engine: round duration",
),
create_counter_panel(
"tycho_mempool_engine_produce_skipped",
"Engine: produce point skipped",
"Engine: points to produce skipped",
),
create_heatmap_panel(
"tycho_mempool_engine_produce_duration",
"tycho_mempool_engine_produce_time",
"Engine: produce point task duration",
),
# == Engine commit == #
Expand All @@ -863,16 +863,15 @@ def mempool() -> RowPanel:
"Engine: committed anchors",
),
create_heatmap_panel(
"tycho_mempool_engine_commit_duration",
"tycho_mempool_engine_commit_time",
"Engine: commit duration",
),
# FIXME next one needs max value over collection period, but no `gauge.set_max()`
create_gauge_panel(
"tycho_mempool_commit_latency_rounds",
"Engine: committed anchor rounds latency (max over batch) #fixme",
"Engine: committed anchor rounds latency (max over batch)",
),
create_heatmap_panel(
"tycho_mempool_commit_anchor_time_latency",
"tycho_mempool_commit_anchor_latency_time",
"Engine: committed anchor time latency (min over batch)",
)
]
Expand All @@ -885,19 +884,19 @@ def mempool_components() -> RowPanel:
create_counter_panel(
"tycho_mempool_verifier_verify",
"Verifier: verify() errors",
labels=['kind=~"kind"'],
labels=['kind=~"$kind"'],
),
create_heatmap_panel(
"tycho_mempool_verifier_verify_duration",
"tycho_mempool_verifier_verify_time",
"Verifier: verify() point structure and author's sig",
),
create_counter_panel(
"tycho_mempool_verifier_validate",
"Verifier: validate() errors and warnings",
labels=['kind=~"kind"'],
labels=['kind=~"$kind"'],
),
create_heatmap_panel(
"tycho_mempool_verifier_validate_duration",
"tycho_mempool_verifier_validate_time",
"Verifier: validate() point dependencies in DAG and all-1 sigs"
),
# == Download tasks - multiple per round == #
Expand All @@ -906,7 +905,7 @@ def mempool_components() -> RowPanel:
"Downloader: tasks (unique point id)",
),
create_heatmap_panel(
"tycho_mempool_download_task_duration",
"tycho_mempool_download_task_time",
"Downloader: tasks duration"
),
# FIXME next one needs max value over collection period, but no `gauge.set_max()`
Expand All @@ -928,35 +927,35 @@ def mempool_components() -> RowPanel:
),
# == Network tasks - multiple per round == #
create_heatmap_panel(
"tycho_mempool_broadcast_query_dispatcher_duration",
"tycho_mempool_broadcast_query_dispatcher_time",
"Dispatcher: Broadcast send"
),
create_heatmap_panel(
"tycho_mempool_broadcast_query_responder_duration",
"tycho_mempool_broadcast_query_responder_time",
"Responder: Broadcast accept"
),
create_heatmap_panel(
"tycho_mempool_signature_query_dispatcher_duration",
"tycho_mempool_signature_query_dispatcher_time",
"Dispatcher: Signature request"
),
create_heatmap_panel(
"tycho_mempool_download_query_dispatcher_duration",
"tycho_mempool_download_query_dispatcher_time",
"Dispatcher: Download request"
),
create_heatmap_panel(
"tycho_mempool_signature_query_responder_data_duration",
"tycho_mempool_signature_query_responder_data_time",
"Responder: Signature send: send ready or sign or reject"
),
create_heatmap_panel(
"tycho_mempool_signature_query_responder_pong_duration",
"tycho_mempool_signature_query_responder_pong_time",
"Responder: Signature send: no point or try later"
),
create_heatmap_panel(
"tycho_mempool_download_query_responder_some_duration",
"tycho_mempool_download_query_responder_some_time",
"Responder: Download send: Some(point)"
),
create_heatmap_panel(
"tycho_mempool_download_query_responder_none_duration",
"tycho_mempool_download_query_responder_none_time",
"Responder: Download send: None"
),
]
Expand Down

0 comments on commit 152a8ec

Please sign in to comment.