Skip to content

Commit

Permalink
Consensus/abort download (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo authored Jun 20, 2024
2 parents 3999673 + 9207e0a commit e135ceb
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 179 deletions.
55 changes: 12 additions & 43 deletions consensus/src/dag/dag_location.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
use std::collections::{btree_map, BTreeMap};
use std::future::Future;
use std::ops::RangeInclusive;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll};

use everscale_crypto::ed25519::KeyPair;
use futures_util::FutureExt;
use tokio::sync::mpsc;
use tycho_network::PeerId;
use tycho_util::futures::{JoinTask, Shared};

use crate::dag::dag_point_future::DagPointFuture;
use crate::models::{DagPoint, Digest, Round, Signature, UnixTime, ValidPoint};

/// If DAG location exists, it must have non-empty `versions` map;
Expand All @@ -37,40 +31,6 @@ pub struct DagLocation {
versions: BTreeMap<Digest, DagPointFuture>,
}

#[derive(Clone)]
pub enum DagPointFuture {
Broadcast(Shared<JoinTask<DagPoint>>),
Download {
task: Shared<JoinTask<DagPoint>>,
dependents: mpsc::UnboundedSender<PeerId>,
},
Local(futures_util::future::Ready<DagPoint>),
}

impl DagPointFuture {
pub fn add_depender(&self, dependent: &PeerId) {
if let Self::Download { dependents, .. } = self {
// receiver is dropped upon completion
_ = dependents.send(*dependent);
}
}
}

impl Future for DagPointFuture {
type Output = DagPoint;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
Self::Broadcast(task) | Self::Download { task, .. } => match task.poll_unpin(cx) {
Poll::Ready((dag_point, _)) => Poll::Ready(dag_point),
Poll::Pending => Poll::Pending,
},
Self::Local(ready) => ready.poll_unpin(cx),
}
}
}

impl DagLocation {
// point that is validated depends on other equivocated points futures (if any)
// in the same location, so need to keep order of futures' completion;
Expand All @@ -84,12 +44,21 @@ impl DagLocation {
.entry(digest.clone())
.or_insert_with(|| init(&self.state))
}
pub fn init<F>(&mut self, digest: &Digest, init: F) -> Option<&DagPointFuture>
pub fn init_or_modify<F, U>(
&mut self,
digest: &Digest,
init: F,
modify: U,
) -> Option<&DagPointFuture>
where
F: FnOnce(&InclusionState) -> DagPointFuture,
U: FnOnce(&DagPointFuture),
{
match self.versions.entry(digest.clone()) {
btree_map::Entry::Occupied(_) => None,
btree_map::Entry::Occupied(entry) => {
modify(entry.get());
None
}
btree_map::Entry::Vacant(entry) => Some(entry.insert(init(&self.state))),
}
}
Expand Down
128 changes: 128 additions & 0 deletions consensus/src/dag/dag_point_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use futures_util::FutureExt;
use tokio::sync::{mpsc, oneshot};
use tycho_network::PeerId;
use tycho_util::futures::{JoinTask, Shared};
use tycho_util::sync::OnceTake;

use crate::dag::{DagRound, InclusionState, Verifier};
use crate::effects::{CurrentRoundContext, Effects, ValidateContext};
use crate::intercom::Downloader;
use crate::models::{DagPoint, Digest, Location, PointId};
use crate::Point;

#[derive(Clone)]
pub struct DagPointFuture(DagPointFutureType);

impl Future for DagPointFuture {
type Output = DagPoint;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut self.0 {
DagPointFutureType::Broadcast(task) | DagPointFutureType::Download { task, .. } => {
match task.poll_unpin(cx) {
Poll::Ready((dag_point, _)) => Poll::Ready(dag_point),
Poll::Pending => Poll::Pending,
}
}
DagPointFutureType::Local(ready) => ready.poll_unpin(cx),
}
}
}

#[derive(Clone)]
enum DagPointFutureType {
Broadcast(Shared<JoinTask<DagPoint>>),
Download {
task: Shared<JoinTask<DagPoint>>,
dependents: mpsc::UnboundedSender<PeerId>,
verified: Arc<OnceTake<oneshot::Sender<Point>>>,
},
Local(futures_util::future::Ready<DagPoint>),
}

impl DagPointFuture {
pub fn new_local(dag_point: &DagPoint, state: &InclusionState) -> Self {
state.init(dag_point);
Self(DagPointFutureType::Local(futures_util::future::ready(
dag_point.clone(),
)))
}

pub fn new_broadcast(
point_dag_round: &DagRound,
point: &Point,
state: &InclusionState,
downloader: &Downloader,
effects: &Effects<CurrentRoundContext>,
) -> Self {
let downloader = downloader.clone();
let span = effects.span().clone();
let point_dag_round = point_dag_round.downgrade();
let point = point.clone();
let state = state.clone();
DagPointFuture(DagPointFutureType::Broadcast(Shared::new(JoinTask::new(
Verifier::validate(point, point_dag_round, downloader, span)
.inspect(move |dag_point| state.init(dag_point)),
))))
}

pub fn new_download(
point_dag_round: &DagRound,
author: &PeerId,
digest: &Digest,
state: &InclusionState,
downloader: &Downloader,
effects: &Effects<ValidateContext>,
) -> Self {
let downloader = downloader.clone();
let effects = effects.clone();
let state = state.clone();
let point_dag_round = point_dag_round.clone();
let (dependents_tx, dependents_rx) = mpsc::unbounded_channel();
let (broadcast_tx, broadcast_rx) = oneshot::channel();
let point_id = PointId {
location: Location {
author: *author,
round: point_dag_round.round(),
},
digest: digest.clone(),
};
DagPointFuture(DagPointFutureType::Download {
task: Shared::new(JoinTask::new(
downloader
.run(
point_id,
point_dag_round,
dependents_rx,
broadcast_rx,
effects,
)
.inspect(move |dag_point| state.init(dag_point)),
)),
dependents: dependents_tx,
verified: Arc::new(OnceTake::new(broadcast_tx)),
})
}

pub fn add_depender(&self, dependent: &PeerId) {
if let DagPointFutureType::Download { dependents, .. } = &self.0 {
// receiver is dropped upon completion
_ = dependents.send(*dependent);
}
}

pub fn resolve_download(&self, broadcast: &Point) {
if let DagPointFutureType::Download { verified, .. } = &self.0 {
if let Some(oneshot) = verified.take() {
// receiver is dropped upon completion
_ = oneshot.send(broadcast.clone());
}
}
}
}
58 changes: 17 additions & 41 deletions consensus/src/dag/dag_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ use std::sync::{Arc, Weak};
use everscale_crypto::ed25519::KeyPair;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use tokio::sync::mpsc;
use tracing::Span;
use tycho_network::PeerId;
use tycho_util::futures::{JoinTask, Shared};
use tycho_util::FastDashMap;

use crate::dag::anchor_stage::AnchorStage;
use crate::dag::{DagLocation, DagPointFuture, InclusionState, Verifier};
use crate::dag::dag_point_future::DagPointFuture;
use crate::dag::{DagLocation, InclusionState};
use crate::effects::{CurrentRoundContext, Effects, ValidateContext};
use crate::engine::MempoolConfig;
use crate::intercom::{Downloader, PeerSchedule};
use crate::models::{DagPoint, Digest, Location, NodeCount, Point, PointId, Round, ValidPoint};
use crate::models::{DagPoint, Digest, NodeCount, Point, Round, ValidPoint};

#[derive(Clone)]
/// Allows memory allocated by DAG to be freed
Expand Down Expand Up @@ -149,6 +148,7 @@ impl DagRound {
WeakDagRound(Arc::downgrade(&self.0))
}

/// Point already verified
pub fn add_broadcast_exact(
&self,
point: &Point,
Expand All @@ -163,23 +163,17 @@ impl DagRound {
);
let digest = point.digest();
self.edit(&point.body().location.author, |loc| {
let downloader = downloader.clone();
let span = effects.span().clone();
let state = loc.state().clone();
let point_dag_round = self.downgrade();
let point = point.clone();
loc.init(digest, |state| {
let result_state = loc.state().clone();
loc.init_or_modify(
digest,
// FIXME: prior Responder refactor: could not sign during validation,
// because current DAG round could advance concurrently;
// now current dag round changes consistently,
// maybe its possible to reduce locking in 'inclusion state'
let state = state.clone();
DagPointFuture::Broadcast(Shared::new(JoinTask::new(
Verifier::validate(point, point_dag_round, downloader, span)
.inspect(move |dag_point| state.init(dag_point)),
)))
})
.map(|first| first.clone().map(|_| state).boxed())
|state| DagPointFuture::new_broadcast(self, point, state, downloader, effects),
|existing| existing.resolve_download(point),
)
.map(|first| first.clone().map(|_| result_state).boxed())
})
}

Expand All @@ -195,26 +189,7 @@ impl DagRound {
) -> DagPointFuture {
let future = self.edit(author, |loc| {
loc.get_or_init(digest, |state| {
let downloader = downloader.clone();
let effects = effects.clone();
let state = state.clone();
let point_dag_round = self.clone();
let (tx, rx) = mpsc::unbounded_channel();
let point_id = PointId {
location: Location {
author: *author,
round: self.round(),
},
digest: digest.clone(),
};
DagPointFuture::Download {
task: Shared::new(JoinTask::new(
downloader
.run(point_id, point_dag_round, rx, effects)
.inspect(move |dag_point| state.init(dag_point)),
)),
dependents: tx,
}
DagPointFuture::new_download(self, author, digest, state, downloader, effects)
})
.clone()
});
Expand Down Expand Up @@ -263,10 +238,11 @@ impl DagRound {
"Coding error: dag round mismatches point round on insert"
);
self.edit(sender, |loc| {
let _existing = loc.init(dag_point.digest(), |state| {
state.init(dag_point);
DagPointFuture::Local(futures_util::future::ready(dag_point.clone()))
});
let _ready = loc.init_or_modify(
dag_point.digest(),
|state| DagPointFuture::new_local(dag_point, state),
|_fut| {},
);
loc.state().clone()
})
}
Expand Down
1 change: 1 addition & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub use verifier::*;
mod anchor_stage;
mod dag;
mod dag_location;
mod dag_point_future;
mod dag_round;
mod producer;
mod verifier;
3 changes: 2 additions & 1 deletion consensus/src/dag/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use tracing::{Instrument, Span};
use tycho_util::sync::rayon_run;

use crate::dag::anchor_stage::AnchorStage;
use crate::dag::{DagPointFuture, DagRound, WeakDagRound};
use crate::dag::dag_point_future::DagPointFuture;
use crate::dag::{DagRound, WeakDagRound};
use crate::effects::{AltFormat, Effects, EffectsContext, ValidateContext};
use crate::engine::MempoolConfig;
use crate::intercom::{Downloader, PeerSchedule};
Expand Down
Loading

0 comments on commit e135ceb

Please sign in to comment.