Skip to content

Commit

Permalink
feat(consensus): add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Jun 30, 2024
1 parent 8f59f34 commit 8cae58a
Show file tree
Hide file tree
Showing 15 changed files with 616 additions and 187 deletions.
26 changes: 6 additions & 20 deletions consensus/src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,11 @@ use std::{array, mem};
use futures_util::FutureExt;
use rand::prelude::SliceRandom;
use rand::SeedableRng;
use tokio::sync::mpsc::UnboundedSender;
use tycho_network::PeerId;

use crate::dag::anchor_stage::AnchorStage;
use crate::dag::DagRound;
use crate::effects::{AltFormat, CurrentRoundContext, Effects};
use crate::effects::{AltFormat, Effects, EngineContext};
use crate::engine::MempoolConfig;
use crate::intercom::PeerSchedule;
use crate::models::{Digest, LinkField, Location, Point, PointId, Round, ValidPoint};
Expand Down Expand Up @@ -54,7 +53,7 @@ impl Dag {
&mut self,
next_round: Round,
peer_schedule: &PeerSchedule,
effects: &Effects<CurrentRoundContext>,
effects: &Effects<EngineContext>,
) -> DagRound {
let mut top = match self.rounds.last_key_value() {
None => unreachable!("DAG cannot be empty if properly initialized"),
Expand Down Expand Up @@ -92,17 +91,12 @@ impl Dag {
}

/// result is in historical order
pub fn commit(
&mut self,
next_dag_round: DagRound,
committed: UnboundedSender<(Point, Vec<Point>)>,
effects: Effects<CurrentRoundContext>,
) {
pub fn commit(&mut self, next_dag_round: DagRound) -> Vec<(Point, Vec<Point>)> {
// finding the latest trigger must not take long, better try later
// than wait long for some DagPoint::NotFound, slowing down whole Engine
let _parent_guard = effects.span().enter();
let mut ordered = Vec::new();
let Some(latest_trigger) = Self::latest_trigger(&next_dag_round) else {
return;
return ordered;
};
let _span = tracing::error_span!(
"commit trigger",
Expand All @@ -113,22 +107,14 @@ impl Dag {
.entered();
// when we have a valid trigger, its every point of it's subdag is validated successfully
let mut anchor_stack = Self::anchor_stack(&latest_trigger, next_dag_round.clone());
let mut ordered = Vec::new();
while let Some((anchor, anchor_round)) = anchor_stack.pop() {
// Note every next "little anchor candidate that could" must have at least full dag depth
// Note if sync is implemented as a second sub-graph - drop up to the last linked in chain
self.drop_tail(anchor.point.body().location.round);
let committed = Self::gather_uncommitted(&anchor.point, anchor_round);
ordered.push((anchor.point, committed));
}

effects.log_committed(&ordered);

for points in ordered {
committed
.send(points) // not recoverable
.expect("Failed to send anchor commit message tp mpsc channel");
}
ordered
}

fn latest_trigger(next_dag_round: &DagRound) -> Option<ValidPoint> {
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/dag/dag_point_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tycho_util::sync::OnceTake;

use crate::dag::dag_location::InclusionState;
use crate::dag::{DagRound, Verifier};
use crate::effects::{CurrentRoundContext, Effects, ValidateContext};
use crate::effects::{DownloadContext, Effects, EngineContext, ValidateContext};
use crate::intercom::Downloader;
use crate::models::{DagPoint, Digest, Location, PointId};
use crate::Point;
Expand Down Expand Up @@ -60,15 +60,15 @@ impl DagPointFuture {
point: &Point,
state: &InclusionState,
downloader: &Downloader,
effects: &Effects<CurrentRoundContext>,
effects: &Effects<EngineContext>,
) -> Self {
let downloader = downloader.clone();
let span = effects.span().clone();
let effects = Effects::<ValidateContext>::new(effects, point);
let point_dag_round = point_dag_round.downgrade();
let point = point.clone();
let state = state.clone();
DagPointFuture(DagPointFutureType::Broadcast(Shared::new(JoinTask::new(
Verifier::validate(point, point_dag_round, downloader, span)
Verifier::validate(point, point_dag_round, downloader, effects)
.inspect(move |dag_point| state.init(dag_point)),
))))
}
Expand All @@ -82,7 +82,6 @@ impl DagPointFuture {
effects: &Effects<ValidateContext>,
) -> Self {
let downloader = downloader.clone();
let effects = effects.clone();
let state = state.clone();
let point_dag_round = point_dag_round.clone();
let (dependents_tx, dependents_rx) = mpsc::unbounded_channel();
Expand All @@ -94,6 +93,7 @@ impl DagPointFuture {
},
digest: digest.clone(),
};
let effects = Effects::<DownloadContext>::new(effects, &point_id);
DagPointFuture(DagPointFutureType::Download {
task: Shared::new(JoinTask::new(
downloader
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/dag_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use tycho_util::FastDashMap;
use crate::dag::anchor_stage::AnchorStage;
use crate::dag::dag_location::{DagLocation, InclusionState};
use crate::dag::dag_point_future::DagPointFuture;
use crate::effects::{CurrentRoundContext, Effects, ValidateContext};
use crate::effects::{Effects, EngineContext, ValidateContext};
use crate::engine::MempoolConfig;
use crate::intercom::{Downloader, PeerSchedule};
use crate::models::{DagPoint, Digest, PeerCount, Point, Round, ValidPoint};
Expand Down Expand Up @@ -131,7 +131,7 @@ impl DagRound {
&self,
point: &Point,
downloader: &Downloader,
effects: &Effects<CurrentRoundContext>,
effects: &Effects<EngineContext>,
) -> Option<BoxFuture<'static, InclusionState>> {
let _guard = effects.span().enter();
assert_eq!(
Expand Down
70 changes: 47 additions & 23 deletions consensus/src/dag/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use std::sync::Arc;

use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use tracing::{Instrument, Span};
use tracing::Instrument;
use tycho_util::metrics::HistogramGuard;
use tycho_util::sync::rayon_run;

use crate::dag::anchor_stage::AnchorStage;
use crate::dag::dag_point_future::DagPointFuture;
use crate::dag::{DagRound, WeakDagRound};
use crate::effects::{AltFormat, Effects, EffectsContext, ValidateContext};
use crate::effects::{Effects, ValidateContext};
use crate::engine::MempoolConfig;
use crate::intercom::{Downloader, PeerSchedule};
use crate::models::{DagPoint, Link, LinkField, Location, PeerCount, Point, ValidPoint};
Expand All @@ -33,24 +34,27 @@ pub struct Verifier;
impl Verifier {
/// the first and mandatory check of any Point received no matter where from
pub fn verify(point: &Point, peer_schedule: &PeerSchedule) -> Result<(), DagPoint> {
if !point.is_integrity_ok() {
let _task_duration = HistogramGuard::begin(ValidateContext::VERIFY_DURATION);
let result = if !point.is_integrity_ok() {
Err(DagPoint::NotExists(Arc::new(point.id()))) // cannot use point body
} else if !(point.is_well_formed() && Self::is_list_of_signers_ok(point, peer_schedule)) {
// point links, etc. will not be used
Err(DagPoint::Invalid(point.clone()))
} else {
Ok(())
}
};
ValidateContext::verified(&result);
result
}

/// must be called iff [`Self::verify`] succeeded
pub async fn validate(
point: Point, // @ r+0
r_0: WeakDagRound, // r+0
downloader: Downloader,
parent_span: Span,
effects: Effects<ValidateContext>,
) -> DagPoint {
let effects = Effects::<ValidateContext>::new(&parent_span, &point);
let _task_duration = HistogramGuard::begin(ValidateContext::VALIDATE_DURATION);
let span_guard = effects.span().enter();

// for genesis point it's sufficient to be well-formed and pass integrity check,
Expand All @@ -62,7 +66,8 @@ impl Verifier {
);
let Some(r_0) = r_0.upgrade() else {
tracing::info!("cannot (in)validate point, no round in local DAG");
return DagPoint::Suspicious(ValidPoint::new(point.clone()));
let dag_point = DagPoint::Suspicious(ValidPoint::new(point.clone()));
return ValidateContext::validated(dag_point);
};
assert_eq!(
point.body().location.round,
Expand Down Expand Up @@ -90,12 +95,13 @@ impl Verifier {
&dependencies,
))
{
return DagPoint::Invalid(point.clone());
return ValidateContext::validated(DagPoint::Invalid(point.clone()));
}

let Some(r_1) = r_0.prev().upgrade() else {
tracing::info!("cannot (in)validate point's 'includes', no round in local DAG");
return DagPoint::Suspicious(ValidPoint::new(point.clone()));
let dag_point = DagPoint::Suspicious(ValidPoint::new(point.clone()));
return ValidateContext::validated(dag_point);
};
Self::gather_deps(&point, &r_1, &downloader, &effects, &dependencies);

Expand All @@ -120,7 +126,7 @@ impl Verifier {

let mut sig_checked = false;
let mut deps_checked = None;
loop {
let dag_point = loop {
tokio::select! {
is_sig_ok = &mut signatures_fut, if !sig_checked => if is_sig_ok {
match deps_checked {
Expand All @@ -140,7 +146,8 @@ impl Verifier {
}
}
}
}
};
ValidateContext::validated(dag_point)
}

fn is_self_links_ok(
Expand Down Expand Up @@ -462,17 +469,34 @@ impl Verifier {
true
}
}
impl EffectsContext for ValidateContext {}

impl Effects<ValidateContext> {
fn new(parent_span: &Span, point: &Point) -> Self {
Self::new_child(parent_span, || {
tracing::error_span!(
"validate",
author = display(point.body().location.author.alt()),
round = point.body().location.round.0,
digest = display(point.digest().alt()),
)
})

impl ValidateContext {
const VERIFY_DURATION: &'static str = "tycho_mempool_verifier_verify_duration";
const VALIDATE_DURATION: &'static str = "tycho_mempool_verifier_validate_duration";

fn verified(result: &Result<(), DagPoint>) {
if let Err(dag_point) = result {
Self::meter(dag_point, "tycho_mempool_verifier_verify");
};
}

fn validated(result: DagPoint) -> DagPoint {
Self::meter(&result, "tycho_mempool_verifier_validate");
result
}

fn meter(dag_point: &DagPoint, metric_name: &'static str) {
match dag_point {
DagPoint::Trusted(_) => {}
DagPoint::Suspicious(_) => {
metrics::counter!(metric_name, &[("kind", "suspicious")]).increment(1);
}
DagPoint::Invalid(_) => {
metrics::counter!(metric_name, &[("kind", "invalid")]).increment(1);
}
DagPoint::NotExists(_) => {
metrics::counter!(metric_name, &[("kind", "not_exists")]).increment(1);
}
};
}
}
Loading

0 comments on commit 8cae58a

Please sign in to comment.