diff --git a/consensus/src/dag/dag_round.rs b/consensus/src/dag/dag_round.rs index a7e9a9aa1..3d7fcbf46 100644 --- a/consensus/src/dag/dag_round.rs +++ b/consensus/src/dag/dag_round.rs @@ -13,9 +13,14 @@ use crate::intercom::{Downloader, PeerSchedule}; use crate::models::{DagPoint, Digest, NodeCount, Point, PointId, Round, ValidPoint}; #[derive(Clone)] +/// Allows memory allocated by DAG to be freed pub struct WeakDagRound(Weak); #[derive(Clone)] +/// do not pass to backwards-recursive async tasks +/// (where DAG_DEPTH is just a logical limit, but is not explicitly applicable) +/// to prevent severe memory leaks of a whole DAG round +/// (in case congested tokio runtime reorders futures), use [WeakDagRound] for that pub struct DagRound(Arc); struct DagRoundInner { @@ -30,7 +35,7 @@ struct DagRoundInner { } impl WeakDagRound { - pub const BOTTOM: Self = WeakDagRound(Weak::new()); + const BOTTOM: Self = WeakDagRound(Weak::new()); pub fn get(&self) -> Option { self.0.upgrade().map(DagRound) } @@ -185,7 +190,7 @@ impl DagRound { if &point.body.location.round != self.round() { panic!("Coding error: dag round mismatches point round on add") } - let dag_round = self.clone(); + let dag_round = self.as_weak(); let digest = &point.digest; self.edit(&point.body.location.author, |loc| { let state = loc.state().clone(); @@ -206,7 +211,7 @@ impl DagRound { if !Verifier::verify(point, peer_schedule).is_ok() { panic!("Coding error: malformed point") } - let point = Verifier::validate(point.clone(), self.clone(), downloader.clone()).await; + let point = Verifier::validate(point.clone(), self.as_weak(), downloader.clone()).await; if point.trusted().is_none() { panic!("Coding error: not a trusted point") } diff --git a/consensus/src/dag/verifier.rs b/consensus/src/dag/verifier.rs index 1dc2b2c69..e2a10f229 100644 --- a/consensus/src/dag/verifier.rs +++ b/consensus/src/dag/verifier.rs @@ -5,10 +5,12 @@ use tokio::task::JoinSet; use tycho_network::PeerId; use crate::dag::anchor_stage::AnchorStage; -use crate::dag::DagRound; +use crate::dag::{DagRound, WeakDagRound}; use crate::engine::MempoolConfig; use crate::intercom::{Downloader, PeerSchedule}; -use crate::models::{DagPoint, Digest, Link, Location, NodeCount, Point, PointId, ValidPoint}; +use crate::models::{ + DagPoint, Digest, Link, Location, NodeCount, Point, PointId, Ugly, ValidPoint, +}; // Note on equivocation. // Detected point equivocation does not invalidate the point, it just @@ -46,9 +48,16 @@ impl Verifier { /// must be called iff [Self::verify] succeeded pub async fn validate( point: Arc, // @ r+0 - r_0: DagRound, // r+0 + r_0: WeakDagRound, // r+0 downloader: Downloader, ) -> DagPoint { + let Some(r_0) = r_0.get() else { + tracing::warn!( + "cannot validate {:?}, local DAG moved far forward", + point.id().ugly() + ); + return DagPoint::NotExists(Arc::new(point.id())); + }; // TODO upgrade Weak whenever used to let Dag Round drop if some future hangs up for long if &point.body.location.round != r_0.round() { panic!("Coding error: dag round mismatches point round") @@ -62,14 +71,17 @@ impl Verifier { }) { return DagPoint::Invalid(point.clone()); } - if let Some(r_1) = r_0.prev().get() { - Self::gather_deps(&point, &r_1, &downloader, &mut dependencies); - return Self::check_deps(&point, dependencies).await; - } - // If r-1 exceeds dag depth, the arg point @ r+0 is considered valid by itself. - // Any point @ r+0 will be committed, only if it has valid proof @ r+1 - // included into valid anchor chain, i.e. validated by consensus. - DagPoint::Trusted(ValidPoint::new(point.clone())) + let Some(r_1) = r_0.prev().get() else { + // If r-1 exceeds dag depth, the arg point @ r+0 is considered valid by itself. + // Any point @ r+0 will be committed, only if it has valid proof @ r+1 + // included into valid anchor chain, i.e. validated by consensus. + return DagPoint::Trusted(ValidPoint::new(point.clone())); + }; + Self::gather_deps(&point, &r_1, &downloader, &mut dependencies); + // drop strong links before await + _ = r_0; + _ = r_1; + Self::check_deps(&point, dependencies).await } fn is_self_links_ok( @@ -128,12 +140,15 @@ impl Verifier { } !found }); - if dag_round.prev().get().map(|r| dag_round = r).is_none() { - // if links in point exceed DAG depth, consider them valid by now; - // either dependencies have more recent link and point will be invalidated later, - // or author was less successful to get fresh data and did not commit for long - // (thus keeps more history in its local Dag) - break; + match dag_round.prev().get() { + Some(r) => dag_round = r, + None => { + // if links in point exceed DAG depth, consider them valid by now; + // either dependencies have more recent link and point will be invalidated later, + // or author was less successful to get fresh data and did not commit for long + // (thus keeps more history in its local Dag) + break; + } } } // valid linked points will be in dag without this addition by recursion, @@ -174,7 +189,7 @@ impl Verifier { }, digest: digest.clone(), }; - downloader.run(point_id, round.clone(), dependant.clone()) + downloader.run(point_id, round.as_weak(), dependant.clone()) }) }); dependencies.spawn(shared.map(|(dag_point, _)| dag_point)); diff --git a/consensus/src/intercom/dependency/downloader.rs b/consensus/src/intercom/dependency/downloader.rs index f41982161..b5046ae73 100644 --- a/consensus/src/intercom/dependency/downloader.rs +++ b/consensus/src/intercom/dependency/downloader.rs @@ -11,7 +11,7 @@ use tokio::sync::{broadcast, watch}; use tycho_network::PeerId; use tycho_util::{FastHashMap, FastHashSet}; -use crate::dag::{DagRound, Verifier, WeakDagRound}; +use crate::dag::{Verifier, WeakDagRound}; use crate::engine::MempoolConfig; use crate::intercom::dto::{PeerState, PointByIdResponse}; use crate::intercom::{Dispatcher, PeerSchedule}; @@ -38,21 +38,24 @@ impl Downloader { pub async fn run( self, point_id: PointId, - point_round: DagRound, + point_round: WeakDagRound, // TODO it would be great to increase the number of dependants in-flight, // but then the DAG needs to store some sort of updatable state machine // instead of opaque Shared> dependant: PeerId, ) -> DagPoint { + let Some(point_round_temp) = point_round.get() else { + return DagPoint::NotExists(Arc::new(point_id)); + }; assert_eq!( point_id.location.round, - *point_round.round(), + *point_round_temp.round(), "point and DAG round mismatch" ); // request point from its signers (any dependant is among them as point is already verified) let all_peers = self .peer_schedule - .peers_for(&point_round.round().next()) + .peers_for(&point_round_temp.round().next()) .iter() .map(|(peer_id, state)| (*peer_id, *state)) .collect::>(); @@ -73,8 +76,9 @@ impl Downloader { .chain(iter::once(point_id.location.author)) .collect(); let (has_resolved_tx, has_resolved_rx) = watch::channel(false); + _ = point_round_temp; // do not leak strong ref across unlimited await DownloadTask { - weak_dag_round: point_round.as_weak(), + weak_dag_round: point_round, node_count, request: self.dispatcher.point_by_id_request(&point_id), point_id, @@ -193,7 +197,8 @@ impl DownloadTask { } Ok(PointByIdResponse(None)) => { if self.mandatory.remove(&peer_id) { - // it's a ban + // it's a ban in case permanent storage is used, + // the other way - peer can could have advanced on full DAG_DEPTH already tracing::error!( "{} : {peer_id:.4?} must have returned {:?}", self.parent.log_id, @@ -215,18 +220,14 @@ impl DownloadTask { self.parent.log_id ); } - let Some(dag_round) = self.weak_dag_round.get() else { - tracing::warn!( - "{} : {peer_id:.4?} no more retries, local DAG moved far forward", - self.parent.log_id - ); - // DAG could not have moved if this point was needed for commit - return Some(DagPoint::NotExists(Arc::new(self.point_id.clone()))); - }; match Verifier::verify(&point, &self.parent.peer_schedule) { Ok(()) => { - let validated = - Verifier::validate(point, dag_round, self.parent.clone()).await; + let validated = Verifier::validate( + point, + self.weak_dag_round.clone(), + self.parent.clone(), + ) + .await; if validated.trusted().is_some() { tracing::debug!( "{} : downloaded dependency {:?}",