From 68ec610ccea6682f09c1818d24de65a12d1f7061 Mon Sep 17 00:00:00 2001 From: Kirill Mikheev Date: Thu, 20 Jun 2024 09:42:54 +0300 Subject: [PATCH 1/2] fix(consensus): stop download if got broadcast --- consensus/src/dag/dag_location.rs | 26 ++- consensus/src/dag/dag_round.rs | 69 ++++--- .../src/intercom/dependency/downloader.rs | 176 ++++++++---------- util/src/lib.rs | 3 + util/src/sync/once_take.rs | 89 +++++++++ 5 files changed, 241 insertions(+), 122 deletions(-) create mode 100644 util/src/sync/once_take.rs diff --git a/consensus/src/dag/dag_location.rs b/consensus/src/dag/dag_location.rs index f3bfece05..d92bc8917 100644 --- a/consensus/src/dag/dag_location.rs +++ b/consensus/src/dag/dag_location.rs @@ -7,11 +7,13 @@ use std::task::{Context, Poll}; use everscale_crypto::ed25519::KeyPair; use futures_util::FutureExt; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tycho_network::PeerId; use tycho_util::futures::{JoinTask, Shared}; +use tycho_util::sync::OnceTake; use crate::models::{DagPoint, Digest, Round, Signature, UnixTime, ValidPoint}; +use crate::Point; /// If DAG location exists, it must have non-empty `versions` map; /// @@ -43,6 +45,7 @@ pub enum DagPointFuture { Download { task: Shared>, dependents: mpsc::UnboundedSender, + verified: Arc>>, }, Local(futures_util::future::Ready), } @@ -54,6 +57,14 @@ impl DagPointFuture { _ = dependents.send(*dependent); } } + pub fn resolve_download(&self, broadcast: &Point) { + if let Self::Download { verified, .. } = self { + if let Some(oneshot) = verified.take() { + // receiver is dropped upon completion + _ = oneshot.send(broadcast.clone()); + } + } + } } impl Future for DagPointFuture { @@ -84,12 +95,21 @@ impl DagLocation { .entry(digest.clone()) .or_insert_with(|| init(&self.state)) } - pub fn init(&mut self, digest: &Digest, init: F) -> Option<&DagPointFuture> + pub fn init_or_modify( + &mut self, + digest: &Digest, + init: F, + modify: U, + ) -> Option<&DagPointFuture> where F: FnOnce(&InclusionState) -> DagPointFuture, + U: FnOnce(&DagPointFuture), { match self.versions.entry(digest.clone()) { - btree_map::Entry::Occupied(_) => None, + btree_map::Entry::Occupied(entry) => { + modify(entry.get()); + None + } btree_map::Entry::Vacant(entry) => Some(entry.insert(init(&self.state))), } } diff --git a/consensus/src/dag/dag_round.rs b/consensus/src/dag/dag_round.rs index 896f1071e..ffb9af2ce 100644 --- a/consensus/src/dag/dag_round.rs +++ b/consensus/src/dag/dag_round.rs @@ -3,10 +3,11 @@ use std::sync::{Arc, Weak}; use everscale_crypto::ed25519::KeyPair; use futures_util::future::BoxFuture; use futures_util::FutureExt; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::Span; use tycho_network::PeerId; use tycho_util::futures::{JoinTask, Shared}; +use tycho_util::sync::OnceTake; use tycho_util::FastDashMap; use crate::dag::anchor_stage::AnchorStage; @@ -149,6 +150,7 @@ impl DagRound { WeakDagRound(Arc::downgrade(&self.0)) } + /// Point already verified pub fn add_broadcast_exact( &self, point: &Point, @@ -163,23 +165,28 @@ impl DagRound { ); let digest = point.digest(); self.edit(&point.body().location.author, |loc| { - let downloader = downloader.clone(); - let span = effects.span().clone(); - let state = loc.state().clone(); - let point_dag_round = self.downgrade(); - let point = point.clone(); - loc.init(digest, |state| { - // FIXME: prior Responder refactor: could not sign during validation, - // because current DAG round could advance concurrently; - // now current dag round changes consistently, - // maybe its possible to reduce locking in 'inclusion state' - let state = state.clone(); - DagPointFuture::Broadcast(Shared::new(JoinTask::new( - Verifier::validate(point, point_dag_round, downloader, span) - .inspect(move |dag_point| state.init(dag_point)), - ))) - }) - .map(|first| first.clone().map(|_| state).boxed()) + let result_state = loc.state().clone(); + loc.init_or_modify( + digest, + |state| { + let downloader = downloader.clone(); + let span = effects.span().clone(); + let point_dag_round = self.downgrade(); + let point = point.clone(); + + // FIXME: prior Responder refactor: could not sign during validation, + // because current DAG round could advance concurrently; + // now current dag round changes consistently, + // maybe its possible to reduce locking in 'inclusion state' + let state = state.clone(); + DagPointFuture::Broadcast(Shared::new(JoinTask::new( + Verifier::validate(point, point_dag_round, downloader, span) + .inspect(move |dag_point| state.init(dag_point)), + ))) + }, + |existing| existing.resolve_download(point), + ) + .map(|first| first.clone().map(|_| result_state).boxed()) }) } @@ -199,7 +206,8 @@ impl DagRound { let effects = effects.clone(); let state = state.clone(); let point_dag_round = self.clone(); - let (tx, rx) = mpsc::unbounded_channel(); + let (dependents_tx, dependents_rx) = mpsc::unbounded_channel(); + let (broadcast_tx, broadcast_rx) = oneshot::channel(); let point_id = PointId { location: Location { author: *author, @@ -210,10 +218,17 @@ impl DagRound { DagPointFuture::Download { task: Shared::new(JoinTask::new( downloader - .run(point_id, point_dag_round, rx, effects) + .run( + point_id, + point_dag_round, + dependents_rx, + broadcast_rx, + effects, + ) .inspect(move |dag_point| state.init(dag_point)), )), - dependents: tx, + dependents: dependents_tx, + verified: Arc::new(OnceTake::new(broadcast_tx)), } }) .clone() @@ -263,10 +278,14 @@ impl DagRound { "Coding error: dag round mismatches point round on insert" ); self.edit(sender, |loc| { - let _existing = loc.init(dag_point.digest(), |state| { - state.init(dag_point); - DagPointFuture::Local(futures_util::future::ready(dag_point.clone())) - }); + let _ready = loc.init_or_modify( + dag_point.digest(), + |state| { + state.init(dag_point); + DagPointFuture::Local(futures_util::future::ready(dag_point.clone())) + }, + |_fut| {}, + ); loc.state().clone() }) } diff --git a/consensus/src/intercom/dependency/downloader.rs b/consensus/src/intercom/dependency/downloader.rs index 7461366e4..ec11865e9 100644 --- a/consensus/src/intercom/dependency/downloader.rs +++ b/consensus/src/intercom/dependency/downloader.rs @@ -6,19 +6,18 @@ use futures_util::stream::FuturesUnordered; use futures_util::{FutureExt, StreamExt}; use rand::{thread_rng, RngCore}; use tokio::sync::broadcast::error::RecvError; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, oneshot}; +use tracing::Instrument; use tycho_network::PeerId; use tycho_util::FastHashMap; -use crate::dag::{DagRound, Verifier, WeakDagRound}; -use crate::dyn_event; +use crate::dag::{DagRound, Verifier}; use crate::effects::{AltFormat, Effects, EffectsContext, ValidateContext}; use crate::engine::MempoolConfig; use crate::intercom::dto::{PeerState, PointByIdResponse}; use crate::intercom::{Dispatcher, PeerSchedule}; use crate::models::{DagPoint, NodeCount, PointId}; - -type DownloadResult = anyhow::Result; +use crate::{dyn_event, Point}; #[derive(Clone)] pub struct Downloader { @@ -64,6 +63,7 @@ impl Downloader { // if we need to download at a very deep round - let the start of this task hold strong ref. point_dag_round_strong: DagRound, dependers: mpsc::UnboundedReceiver, + verified_broadcast: oneshot::Receiver, parent_effects: Effects, ) -> DagPoint { let effects = Effects::::new(&parent_effects, &point_id); @@ -104,36 +104,66 @@ impl Downloader { -1 } }; - let updates = peer_schedule.updates(); let point_dag_round = point_dag_round_strong.downgrade(); // do not leak span and strong round ref across await drop(point_dag_round_strong); drop(span_guard); - DownloadTask { - parent: self, - effects, - point_dag_round, + + let downloaded = DownloadTask { + parent: self.clone(), node_count, request: Dispatcher::point_by_id_request(&point_id), - point_id, + point_id: point_id.clone(), undone_peers, done_peers, downloading: FuturesUnordered::new(), dependers, - updates, + updates: peer_schedule.updates(), attempt: 0, skip_next_attempt: false, } - .run() - .await + .run(verified_broadcast) + .instrument(effects.span().clone()) + .await; + + match downloaded { + None => DagPoint::NotExists(Arc::new(point_id)), + Some(point) => { + tracing::trace!( + parent: effects.span(), + peer = display(point.body().location.author.alt()), + "downloaded, now validating", + ); + let dag_point = Verifier::validate( + point.clone(), + point_dag_round, + self.clone(), + effects.span().clone(), + ) + // this is the only `await` in the task, that resolves the download + .await; + let level = if dag_point.trusted().is_some() { + tracing::Level::DEBUG + } else if dag_point.valid().is_some() { + tracing::Level::WARN + } else { + tracing::Level::ERROR + }; + dyn_event!( + parent: effects.span(), + level, + result = display(dag_point.alt()), + "validated", + ); + dag_point + } + } } } struct DownloadTask { parent: Downloader, - effects: Effects, - point_dag_round: WeakDagRound, node_count: NodeCount, request: tycho_network::Request, @@ -143,7 +173,7 @@ struct DownloadTask { done_peers: i16, downloading: FuturesUnordered)>>, - /// populated by waiting validation tasks, source of [`mandatory`] set + /// populated by waiting validation tasks dependers: mpsc::UnboundedReceiver, updates: broadcast::Receiver<(PeerId, PeerState)>, @@ -155,28 +185,32 @@ struct DownloadTask { impl DownloadTask { // point's author is a top priority; fallback priority is (any) dependent point's author // recursively: every dependency is expected to be signed by 2/3+1 - pub async fn run(&mut self) -> DagPoint { + pub async fn run(&mut self, verified_broadcast: oneshot::Receiver) -> Option { // always ask the author let author = self.point_id.location.author; self.add_depender(&author); self.download_random(true); let mut interval = tokio::time::interval(MempoolConfig::DOWNLOAD_INTERVAL); - let dag_point = loop { + let mut verified_broadcast = std::pin::pin!(verified_broadcast); + loop { tokio::select! { + Ok(point) = &mut verified_broadcast => break Some(point), Some((peer_id, downloaded)) = self.downloading.next() => - // de-schedule current task if point is verified and wait for validation - match self.match_downloaded(peer_id, downloaded).await { - Some(dag_point) => break dag_point, - None => continue + match self.verify(&peer_id, downloaded) { + Some(point) => break Some(point), + None => if self.shall_continue() { + continue + } else { + break None; + } }, Some(depender) = self.dependers.recv() => self.add_depender(&depender), _ = interval.tick() => self.download_random(false), update = self.updates.recv() => self.match_peer_updates(update), } - }; - // clean the channel, it will stay in `DagPointFuture` that owns current task - self.dependers.close(); - dag_point + } + // on exit futures are dropped and receivers are cleaned, + // senders will stay in `DagPointFuture` that owns current task } fn add_depender(&mut self, peer_id: &PeerId) { @@ -256,97 +290,59 @@ impl DownloadTask { ); } - async fn match_downloaded( + fn verify( &mut self, - peer_id: PeerId, + peer_id: &PeerId, resolved: anyhow::Result, - ) -> Option { + ) -> Option { match resolved { Err(network_err) => { let status = self .undone_peers - .get_mut(&peer_id) + .get_mut(peer_id) .unwrap_or_else(|| panic!("Coding error: peer not in map {}", peer_id.alt())); status.is_in_flight = false; status.failed_attempts += 1; tracing::warn!( - parent: self.effects.span(), peer = display(peer_id.alt()), error = display(network_err), "network error", ); + None } Ok(PointByIdResponse(None)) => { self.done_peers += 1; - match self.undone_peers.remove(&peer_id) { + match self.undone_peers.remove(peer_id) { Some(state) if state.is_depender => { // if points are persisted in storage - it's a ban; // else - peer evicted this point from its cache, as the point // is at least DAG_DEPTH rounds older than current consensus round - tracing::warn!( - parent: self.effects.span(), - peer = display(peer_id.alt()), - "must have returned", - ); + tracing::warn!(peer = display(peer_id.alt()), "must have returned"); } Some(_) => { - tracing::debug!( - parent: self.effects.span(), - peer = display(peer_id.alt()), - "didn't return", - ); + tracing::debug!(peer = display(peer_id.alt()), "didn't return"); } None => { - let _guard = self.effects.span().enter(); panic!("already removed peer {}", peer_id.alt()) } - } + }; + None } Ok(PointByIdResponse(Some(point))) if point.id() != self.point_id => { self.done_peers += 1; - self.undone_peers.remove(&peer_id); + self.undone_peers.remove(peer_id); // it's a ban tracing::error!( - parent: self.effects.span(), peer_id = display(peer_id.alt()), author = display(point.body().location.author.alt()), round = point.body().location.round.0, digest = display(point.digest().alt()), "returned wrong point", ); + None } Ok(PointByIdResponse(Some(point))) => { - self.undone_peers.remove(&peer_id); match Verifier::verify(&point, &self.parent.inner.peer_schedule) { - Ok(()) => { - tracing::trace!( - parent: self.effects.span(), - peer = display(peer_id.alt()), - "downloaded, now validating", - ); - let dag_point = Verifier::validate( - point, - self.point_dag_round.clone(), - self.parent.clone(), - self.effects.span().clone(), - ) - // this is the only `await` in the task, that resolves the download - .await; - let level = if dag_point.trusted().is_some() { - tracing::Level::DEBUG - } else if dag_point.valid().is_some() { - tracing::Level::WARN - } else { - tracing::Level::ERROR - }; - dyn_event!( - parent: self.effects.span(), - level, - result = display(dag_point.alt()), - "validated", - ); - return Some(dag_point); - } Err(dag_point) => { // reliable peer won't return unverifiable point self.done_peers += 1; @@ -355,32 +351,29 @@ impl DownloadTask { "Coding error: verify() cannot result into a valid point" ); tracing::error!( - parent: self.effects.span(), result = display(dag_point.alt()), peer = display(peer_id.alt()), "downloaded", ); + None } - }; + Ok(()) => Some(point), + } } - }; - self.maybe_not_downloaded() + } } - fn maybe_not_downloaded(&mut self) -> Option { + fn shall_continue(&mut self) -> bool { if self.done_peers >= self.node_count.majority() as i16 { // the only normal case to resolve into `NotExists` - tracing::warn!( - parent: self.effects.span(), - "not downloaded from majority", - ); - Some(DagPoint::NotExists(Arc::new(self.point_id.clone()))) + tracing::warn!("not downloaded from majority"); + false } else { if self.downloading.is_empty() { self.download_random(true); self.skip_next_attempt = true; } - None + true } } @@ -401,14 +394,9 @@ impl DownloadTask { } } Err(err @ RecvError::Lagged(_)) => { - tracing::error!( - parent: self.effects.span(), - error = display(err), - "peer updates" - ); + tracing::error!(error = display(err), "peer updates"); } Err(err @ RecvError::Closed) => { - let _span = self.effects.span().enter(); panic!("peer updates {err}") } } diff --git a/util/src/lib.rs b/util/src/lib.rs index 8ffda0e0a..e23bc79b3 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -18,9 +18,12 @@ pub mod futures { } pub mod sync { + pub use once_take::*; + pub use self::priority_semaphore::{AcquireError, PrioritySemaphore, TryAcquireError}; pub use self::rayon::{rayon_run, rayon_run_fifo}; + mod once_take; mod priority_semaphore; mod rayon; } diff --git a/util/src/sync/once_take.rs b/util/src/sync/once_take.rs new file mode 100644 index 000000000..17f33ad8b --- /dev/null +++ b/util/src/sync/once_take.rs @@ -0,0 +1,89 @@ +use std::mem::MaybeUninit; +use std::sync::atomic::{AtomicBool, Ordering}; + +pub struct OnceTake { + value: MaybeUninit, + has_value: AtomicBool, +} + +impl OnceTake { + pub fn new(value: T) -> Self { + Self { + value: MaybeUninit::new(value), + has_value: AtomicBool::new(true), + } + } + pub fn take(&self) -> Option { + if self.has_value.swap(false, Ordering::Relaxed) { + // SAFETY: `self.value` is initialized and contains a valid `T`. + // `self.has_value` is disarmed and prevents the value from being read twice; + // the value will be dropped at the calling site. + let value = unsafe { self.value.assume_init_read() }; + Some(value) + } else { + None + } + } +} + +impl Drop for OnceTake { + fn drop(&mut self) { + if *self.has_value.get_mut() { + // SAFETY: we are the only thread executing Drop, + // and the value is not dropped outside as per `self.has_value` + unsafe { self.value.assume_init_drop() } + } + } +} + +#[cfg(test)] +mod test { + use std::sync::{Arc, Mutex}; + + use super::OnceTake; + + #[tokio::test] + async fn once_take() -> anyhow::Result<()> { + let counter = DropCounter::default(); + let once = Arc::new(OnceTake::new(counter.clone())); + + let once_1 = once.clone(); + let fut_1 = async move { once_1.take().map(|copy| copy.get()) }; + + let once_2 = once.clone(); + let fut_2 = async move { once_2.take().map(|copy| copy.get()) }; + + let mut result = [tokio::spawn(fut_1).await?, tokio::spawn(fut_2).await?]; + result.sort(); + assert_eq!([None, Some(0)], result); + + assert_eq!(1, counter.get()); + + assert_eq!(None, once.clone().take()); + drop(once); + assert_eq!(1, counter.get()); + Ok(()) + } + + #[derive(Default, Clone, Debug)] + struct DropCounter { + counter: Arc>, + } + impl Drop for DropCounter { + fn drop(&mut self) { + let mut guard = self.counter.lock().unwrap(); + *guard += 1; + } + } + impl DropCounter { + pub fn get(&self) -> u8 { + let guard = self.counter.lock().unwrap(); + *guard + } + } + impl PartialEq for DropCounter { + fn eq(&self, other: &Self) -> bool { + self.get() == other.get() + } + } +} From 9207e0a0d4d7c2b7ee9e984b657885b713b5e503 Mon Sep 17 00:00:00 2001 From: Kirill Mikheev Date: Thu, 20 Jun 2024 10:53:33 +0300 Subject: [PATCH 2/2] refactor(consensus): move dag point future to own module --- consensus/src/dag/dag_location.rs | 53 +---------- consensus/src/dag/dag_point_future.rs | 128 ++++++++++++++++++++++++++ consensus/src/dag/dag_round.rs | 63 ++----------- consensus/src/dag/mod.rs | 1 + consensus/src/dag/verifier.rs | 3 +- 5 files changed, 142 insertions(+), 106 deletions(-) create mode 100644 consensus/src/dag/dag_point_future.rs diff --git a/consensus/src/dag/dag_location.rs b/consensus/src/dag/dag_location.rs index d92bc8917..14709394f 100644 --- a/consensus/src/dag/dag_location.rs +++ b/consensus/src/dag/dag_location.rs @@ -1,19 +1,11 @@ use std::collections::{btree_map, BTreeMap}; -use std::future::Future; use std::ops::RangeInclusive; -use std::pin::Pin; use std::sync::{Arc, OnceLock}; -use std::task::{Context, Poll}; use everscale_crypto::ed25519::KeyPair; -use futures_util::FutureExt; -use tokio::sync::{mpsc, oneshot}; -use tycho_network::PeerId; -use tycho_util::futures::{JoinTask, Shared}; -use tycho_util::sync::OnceTake; +use crate::dag::dag_point_future::DagPointFuture; use crate::models::{DagPoint, Digest, Round, Signature, UnixTime, ValidPoint}; -use crate::Point; /// If DAG location exists, it must have non-empty `versions` map; /// @@ -39,49 +31,6 @@ pub struct DagLocation { versions: BTreeMap, } -#[derive(Clone)] -pub enum DagPointFuture { - Broadcast(Shared>), - Download { - task: Shared>, - dependents: mpsc::UnboundedSender, - verified: Arc>>, - }, - Local(futures_util::future::Ready), -} - -impl DagPointFuture { - pub fn add_depender(&self, dependent: &PeerId) { - if let Self::Download { dependents, .. } = self { - // receiver is dropped upon completion - _ = dependents.send(*dependent); - } - } - pub fn resolve_download(&self, broadcast: &Point) { - if let Self::Download { verified, .. } = self { - if let Some(oneshot) = verified.take() { - // receiver is dropped upon completion - _ = oneshot.send(broadcast.clone()); - } - } - } -} - -impl Future for DagPointFuture { - type Output = DagPoint; - - #[inline] - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match &mut *self { - Self::Broadcast(task) | Self::Download { task, .. } => match task.poll_unpin(cx) { - Poll::Ready((dag_point, _)) => Poll::Ready(dag_point), - Poll::Pending => Poll::Pending, - }, - Self::Local(ready) => ready.poll_unpin(cx), - } - } -} - impl DagLocation { // point that is validated depends on other equivocated points futures (if any) // in the same location, so need to keep order of futures' completion; diff --git a/consensus/src/dag/dag_point_future.rs b/consensus/src/dag/dag_point_future.rs new file mode 100644 index 000000000..ccbc65e4a --- /dev/null +++ b/consensus/src/dag/dag_point_future.rs @@ -0,0 +1,128 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use futures_util::FutureExt; +use tokio::sync::{mpsc, oneshot}; +use tycho_network::PeerId; +use tycho_util::futures::{JoinTask, Shared}; +use tycho_util::sync::OnceTake; + +use crate::dag::{DagRound, InclusionState, Verifier}; +use crate::effects::{CurrentRoundContext, Effects, ValidateContext}; +use crate::intercom::Downloader; +use crate::models::{DagPoint, Digest, Location, PointId}; +use crate::Point; + +#[derive(Clone)] +pub struct DagPointFuture(DagPointFutureType); + +impl Future for DagPointFuture { + type Output = DagPoint; + + #[inline] + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match &mut self.0 { + DagPointFutureType::Broadcast(task) | DagPointFutureType::Download { task, .. } => { + match task.poll_unpin(cx) { + Poll::Ready((dag_point, _)) => Poll::Ready(dag_point), + Poll::Pending => Poll::Pending, + } + } + DagPointFutureType::Local(ready) => ready.poll_unpin(cx), + } + } +} + +#[derive(Clone)] +enum DagPointFutureType { + Broadcast(Shared>), + Download { + task: Shared>, + dependents: mpsc::UnboundedSender, + verified: Arc>>, + }, + Local(futures_util::future::Ready), +} + +impl DagPointFuture { + pub fn new_local(dag_point: &DagPoint, state: &InclusionState) -> Self { + state.init(dag_point); + Self(DagPointFutureType::Local(futures_util::future::ready( + dag_point.clone(), + ))) + } + + pub fn new_broadcast( + point_dag_round: &DagRound, + point: &Point, + state: &InclusionState, + downloader: &Downloader, + effects: &Effects, + ) -> Self { + let downloader = downloader.clone(); + let span = effects.span().clone(); + let point_dag_round = point_dag_round.downgrade(); + let point = point.clone(); + let state = state.clone(); + DagPointFuture(DagPointFutureType::Broadcast(Shared::new(JoinTask::new( + Verifier::validate(point, point_dag_round, downloader, span) + .inspect(move |dag_point| state.init(dag_point)), + )))) + } + + pub fn new_download( + point_dag_round: &DagRound, + author: &PeerId, + digest: &Digest, + state: &InclusionState, + downloader: &Downloader, + effects: &Effects, + ) -> Self { + let downloader = downloader.clone(); + let effects = effects.clone(); + let state = state.clone(); + let point_dag_round = point_dag_round.clone(); + let (dependents_tx, dependents_rx) = mpsc::unbounded_channel(); + let (broadcast_tx, broadcast_rx) = oneshot::channel(); + let point_id = PointId { + location: Location { + author: *author, + round: point_dag_round.round(), + }, + digest: digest.clone(), + }; + DagPointFuture(DagPointFutureType::Download { + task: Shared::new(JoinTask::new( + downloader + .run( + point_id, + point_dag_round, + dependents_rx, + broadcast_rx, + effects, + ) + .inspect(move |dag_point| state.init(dag_point)), + )), + dependents: dependents_tx, + verified: Arc::new(OnceTake::new(broadcast_tx)), + }) + } + + pub fn add_depender(&self, dependent: &PeerId) { + if let DagPointFutureType::Download { dependents, .. } = &self.0 { + // receiver is dropped upon completion + _ = dependents.send(*dependent); + } + } + + pub fn resolve_download(&self, broadcast: &Point) { + if let DagPointFutureType::Download { verified, .. } = &self.0 { + if let Some(oneshot) = verified.take() { + // receiver is dropped upon completion + _ = oneshot.send(broadcast.clone()); + } + } + } +} diff --git a/consensus/src/dag/dag_round.rs b/consensus/src/dag/dag_round.rs index ffb9af2ce..18f3f9759 100644 --- a/consensus/src/dag/dag_round.rs +++ b/consensus/src/dag/dag_round.rs @@ -3,19 +3,17 @@ use std::sync::{Arc, Weak}; use everscale_crypto::ed25519::KeyPair; use futures_util::future::BoxFuture; use futures_util::FutureExt; -use tokio::sync::{mpsc, oneshot}; use tracing::Span; use tycho_network::PeerId; -use tycho_util::futures::{JoinTask, Shared}; -use tycho_util::sync::OnceTake; use tycho_util::FastDashMap; use crate::dag::anchor_stage::AnchorStage; -use crate::dag::{DagLocation, DagPointFuture, InclusionState, Verifier}; +use crate::dag::dag_point_future::DagPointFuture; +use crate::dag::{DagLocation, InclusionState}; use crate::effects::{CurrentRoundContext, Effects, ValidateContext}; use crate::engine::MempoolConfig; use crate::intercom::{Downloader, PeerSchedule}; -use crate::models::{DagPoint, Digest, Location, NodeCount, Point, PointId, Round, ValidPoint}; +use crate::models::{DagPoint, Digest, NodeCount, Point, Round, ValidPoint}; #[derive(Clone)] /// Allows memory allocated by DAG to be freed @@ -168,22 +166,11 @@ impl DagRound { let result_state = loc.state().clone(); loc.init_or_modify( digest, - |state| { - let downloader = downloader.clone(); - let span = effects.span().clone(); - let point_dag_round = self.downgrade(); - let point = point.clone(); - - // FIXME: prior Responder refactor: could not sign during validation, - // because current DAG round could advance concurrently; - // now current dag round changes consistently, - // maybe its possible to reduce locking in 'inclusion state' - let state = state.clone(); - DagPointFuture::Broadcast(Shared::new(JoinTask::new( - Verifier::validate(point, point_dag_round, downloader, span) - .inspect(move |dag_point| state.init(dag_point)), - ))) - }, + // FIXME: prior Responder refactor: could not sign during validation, + // because current DAG round could advance concurrently; + // now current dag round changes consistently, + // maybe its possible to reduce locking in 'inclusion state' + |state| DagPointFuture::new_broadcast(self, point, state, downloader, effects), |existing| existing.resolve_download(point), ) .map(|first| first.clone().map(|_| result_state).boxed()) @@ -202,34 +189,7 @@ impl DagRound { ) -> DagPointFuture { let future = self.edit(author, |loc| { loc.get_or_init(digest, |state| { - let downloader = downloader.clone(); - let effects = effects.clone(); - let state = state.clone(); - let point_dag_round = self.clone(); - let (dependents_tx, dependents_rx) = mpsc::unbounded_channel(); - let (broadcast_tx, broadcast_rx) = oneshot::channel(); - let point_id = PointId { - location: Location { - author: *author, - round: self.round(), - }, - digest: digest.clone(), - }; - DagPointFuture::Download { - task: Shared::new(JoinTask::new( - downloader - .run( - point_id, - point_dag_round, - dependents_rx, - broadcast_rx, - effects, - ) - .inspect(move |dag_point| state.init(dag_point)), - )), - dependents: dependents_tx, - verified: Arc::new(OnceTake::new(broadcast_tx)), - } + DagPointFuture::new_download(self, author, digest, state, downloader, effects) }) .clone() }); @@ -280,10 +240,7 @@ impl DagRound { self.edit(sender, |loc| { let _ready = loc.init_or_modify( dag_point.digest(), - |state| { - state.init(dag_point); - DagPointFuture::Local(futures_util::future::ready(dag_point.clone())) - }, + |state| DagPointFuture::new_local(dag_point, state), |_fut| {}, ); loc.state().clone() diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 9ed08bb7b..4fc2f3b0f 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -7,6 +7,7 @@ pub use verifier::*; mod anchor_stage; mod dag; mod dag_location; +mod dag_point_future; mod dag_round; mod producer; mod verifier; diff --git a/consensus/src/dag/verifier.rs b/consensus/src/dag/verifier.rs index 856ab8744..9c40a7534 100644 --- a/consensus/src/dag/verifier.rs +++ b/consensus/src/dag/verifier.rs @@ -6,7 +6,8 @@ use tracing::{Instrument, Span}; use tycho_util::sync::rayon_run; use crate::dag::anchor_stage::AnchorStage; -use crate::dag::{DagPointFuture, DagRound, WeakDagRound}; +use crate::dag::dag_point_future::DagPointFuture; +use crate::dag::{DagRound, WeakDagRound}; use crate::effects::{AltFormat, Effects, EffectsContext, ValidateContext}; use crate::engine::MempoolConfig; use crate::intercom::{Downloader, PeerSchedule};