Skip to content

Commit

Permalink
fix(consensus): hold less strong links to dag round
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed May 10, 2024
1 parent e32c0cf commit 0758bf4
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 37 deletions.
11 changes: 8 additions & 3 deletions consensus/src/dag/dag_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ use crate::intercom::{Downloader, PeerSchedule};
use crate::models::{DagPoint, Digest, NodeCount, Point, PointId, Round, ValidPoint};

#[derive(Clone)]
/// Allows memory allocated by DAG to be freed
pub struct WeakDagRound(Weak<DagRoundInner>);

#[derive(Clone)]
/// do not pass to backwards-recursive async tasks
/// (where DAG_DEPTH is just a logical limit, but is not explicitly applicable)
/// to prevent severe memory leaks of a whole DAG round
/// (in case congested tokio runtime reorders futures), use [WeakDagRound] for that
pub struct DagRound(Arc<DagRoundInner>);

struct DagRoundInner {
Expand All @@ -30,7 +35,7 @@ struct DagRoundInner {
}

impl WeakDagRound {
pub const BOTTOM: Self = WeakDagRound(Weak::new());
const BOTTOM: Self = WeakDagRound(Weak::new());
pub fn get(&self) -> Option<DagRound> {
self.0.upgrade().map(DagRound)
}
Expand Down Expand Up @@ -185,7 +190,7 @@ impl DagRound {
if &point.body.location.round != self.round() {
panic!("Coding error: dag round mismatches point round on add")
}
let dag_round = self.clone();
let dag_round = self.as_weak();
let digest = &point.digest;
self.edit(&point.body.location.author, |loc| {
let state = loc.state().clone();
Expand All @@ -206,7 +211,7 @@ impl DagRound {
if !Verifier::verify(point, peer_schedule).is_ok() {
panic!("Coding error: malformed point")
}
let point = Verifier::validate(point.clone(), self.clone(), downloader.clone()).await;
let point = Verifier::validate(point.clone(), self.as_weak(), downloader.clone()).await;
if point.trusted().is_none() {
panic!("Coding error: not a trusted point")
}
Expand Down
51 changes: 33 additions & 18 deletions consensus/src/dag/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use tokio::task::JoinSet;
use tycho_network::PeerId;

use crate::dag::anchor_stage::AnchorStage;
use crate::dag::DagRound;
use crate::dag::{DagRound, WeakDagRound};
use crate::engine::MempoolConfig;
use crate::intercom::{Downloader, PeerSchedule};
use crate::models::{DagPoint, Digest, Link, Location, NodeCount, Point, PointId, ValidPoint};
use crate::models::{
DagPoint, Digest, Link, Location, NodeCount, Point, PointId, Ugly, ValidPoint,
};

// Note on equivocation.
// Detected point equivocation does not invalidate the point, it just
Expand Down Expand Up @@ -46,9 +48,16 @@ impl Verifier {
/// must be called iff [Self::verify] succeeded
pub async fn validate(
point: Arc<Point>, // @ r+0
r_0: DagRound, // r+0
r_0: WeakDagRound, // r+0
downloader: Downloader,
) -> DagPoint {
let Some(r_0) = r_0.get() else {
tracing::warn!(
"cannot validate {:?}, local DAG moved far forward",
point.id().ugly()
);
return DagPoint::NotExists(Arc::new(point.id()));
};
// TODO upgrade Weak whenever used to let Dag Round drop if some future hangs up for long
if &point.body.location.round != r_0.round() {
panic!("Coding error: dag round mismatches point round")
Expand All @@ -62,14 +71,17 @@ impl Verifier {
}) {
return DagPoint::Invalid(point.clone());
}
if let Some(r_1) = r_0.prev().get() {
Self::gather_deps(&point, &r_1, &downloader, &mut dependencies);
return Self::check_deps(&point, dependencies).await;
}
// If r-1 exceeds dag depth, the arg point @ r+0 is considered valid by itself.
// Any point @ r+0 will be committed, only if it has valid proof @ r+1
// included into valid anchor chain, i.e. validated by consensus.
DagPoint::Trusted(ValidPoint::new(point.clone()))
let Some(r_1) = r_0.prev().get() else {
// If r-1 exceeds dag depth, the arg point @ r+0 is considered valid by itself.
// Any point @ r+0 will be committed, only if it has valid proof @ r+1
// included into valid anchor chain, i.e. validated by consensus.
return DagPoint::Trusted(ValidPoint::new(point.clone()));
};
Self::gather_deps(&point, &r_1, &downloader, &mut dependencies);
// drop strong links before await
_ = r_0;
_ = r_1;
Self::check_deps(&point, dependencies).await
}

fn is_self_links_ok(
Expand Down Expand Up @@ -128,12 +140,15 @@ impl Verifier {
}
!found
});
if dag_round.prev().get().map(|r| dag_round = r).is_none() {
// if links in point exceed DAG depth, consider them valid by now;
// either dependencies have more recent link and point will be invalidated later,
// or author was less successful to get fresh data and did not commit for long
// (thus keeps more history in its local Dag)
break;
match dag_round.prev().get() {
Some(r) => dag_round = r,
None => {
// if links in point exceed DAG depth, consider them valid by now;
// either dependencies have more recent link and point will be invalidated later,
// or author was less successful to get fresh data and did not commit for long
// (thus keeps more history in its local Dag)
break;
}
}
}
// valid linked points will be in dag without this addition by recursion,
Expand Down Expand Up @@ -174,7 +189,7 @@ impl Verifier {
},
digest: digest.clone(),
};
downloader.run(point_id, round.clone(), dependant.clone())
downloader.run(point_id, round.as_weak(), dependant.clone())
})
});
dependencies.spawn(shared.map(|(dag_point, _)| dag_point));
Expand Down
33 changes: 17 additions & 16 deletions consensus/src/intercom/dependency/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::{broadcast, watch};
use tycho_network::PeerId;
use tycho_util::{FastHashMap, FastHashSet};

use crate::dag::{DagRound, Verifier, WeakDagRound};
use crate::dag::{Verifier, WeakDagRound};
use crate::engine::MempoolConfig;
use crate::intercom::dto::{PeerState, PointByIdResponse};
use crate::intercom::{Dispatcher, PeerSchedule};
Expand All @@ -38,21 +38,24 @@ impl Downloader {
pub async fn run(
self,
point_id: PointId,
point_round: DagRound,
point_round: WeakDagRound,
// TODO it would be great to increase the number of dependants in-flight,
// but then the DAG needs to store some sort of updatable state machine
// instead of opaque Shared<JoinTask<DagPoint>>
dependant: PeerId,
) -> DagPoint {
let Some(point_round_temp) = point_round.get() else {
return DagPoint::NotExists(Arc::new(point_id));
};
assert_eq!(
point_id.location.round,
*point_round.round(),
*point_round_temp.round(),
"point and DAG round mismatch"
);
// request point from its signers (any dependant is among them as point is already verified)
let all_peers = self
.peer_schedule
.peers_for(&point_round.round().next())
.peers_for(&point_round_temp.round().next())
.iter()
.map(|(peer_id, state)| (*peer_id, *state))
.collect::<FastHashMap<PeerId, PeerState>>();
Expand All @@ -73,8 +76,9 @@ impl Downloader {
.chain(iter::once(point_id.location.author))
.collect();
let (has_resolved_tx, has_resolved_rx) = watch::channel(false);
_ = point_round_temp; // do not leak strong ref across unlimited await
DownloadTask {
weak_dag_round: point_round.as_weak(),
weak_dag_round: point_round,
node_count,
request: self.dispatcher.point_by_id_request(&point_id),
point_id,
Expand Down Expand Up @@ -193,7 +197,8 @@ impl DownloadTask {
}
Ok(PointByIdResponse(None)) => {
if self.mandatory.remove(&peer_id) {
// it's a ban
// it's a ban in case permanent storage is used,
// the other way - peer can could have advanced on full DAG_DEPTH already
tracing::error!(
"{} : {peer_id:.4?} must have returned {:?}",
self.parent.log_id,
Expand All @@ -215,18 +220,14 @@ impl DownloadTask {
self.parent.log_id
);
}
let Some(dag_round) = self.weak_dag_round.get() else {
tracing::warn!(
"{} : {peer_id:.4?} no more retries, local DAG moved far forward",
self.parent.log_id
);
// DAG could not have moved if this point was needed for commit
return Some(DagPoint::NotExists(Arc::new(self.point_id.clone())));
};
match Verifier::verify(&point, &self.parent.peer_schedule) {
Ok(()) => {
let validated =
Verifier::validate(point, dag_round, self.parent.clone()).await;
let validated = Verifier::validate(
point,
self.weak_dag_round.clone(),
self.parent.clone(),
)
.await;
if validated.trusted().is_some() {
tracing::debug!(
"{} : downloaded dependency {:?}",
Expand Down

0 comments on commit 0758bf4

Please sign in to comment.