Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/mempool downloader #114

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ pub async fn handle_anchors(
) {
let mut cache = ExternalMessageCache::new(1000);
while let Some((anchor, points)) = rx.recv().await {
let anchor_id: MempoolAnchorId = anchor.body.location.round.0;
let mut messages = Vec::new();
let mut total_messages = 0;
let mut total_bytes = 0;
Expand Down Expand Up @@ -191,13 +192,12 @@ pub async fn handle_anchors(
}
};

if cache.check_unique(anchor.body.location.round.0, cell.repr_hash()) {
if cache.check_unique(anchor_id, cell.repr_hash()) {
messages.push(Arc::new(ExternalMessage::new(cell.clone(), ext_in_message)));
messages_bytes += message.len();
}
}
}
cache.clean(anchor.body.location.round.0);

metrics::gauge!("tycho_mempool_last_anchor_round").set(anchor.body.location.round.0);
metrics::counter!("tycho_mempool_externals_count_total").increment(messages.len() as _);
Expand All @@ -209,20 +209,22 @@ pub async fn handle_anchors(

tracing::info!(
target: tracing_targets::MEMPOOL_ADAPTER,
round = anchor.body.location.round.0,
round = anchor_id,
time = anchor.body.time.as_u64(),
externals_unique = messages.len(),
externals_skipped = total_messages - messages.len(),
"new anchor"
);

let anchor = Arc::new(MempoolAnchor::new(
anchor.body.location.round.0,
anchor_id,
anchor.body.time.as_u64(),
messages,
));

adapter.add_anchor(anchor);

cache.clean(anchor_id);
}
}

Expand Down
3 changes: 2 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ futures-util = { workspace = true }
itertools = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
rand = { workspace = true }
rand_pcg = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
Expand All @@ -43,6 +43,7 @@ tycho-storage = { workspace = true }
tycho-util = { workspace = true }

[dev-dependencies]
humantime = { workspace = true }
parking_lot = { workspace = true, features = ["deadlock_detection"] }
tikv-jemallocator = { workspace = true, features = [
"unprefixed_malloc_on_supported_platforms",
Expand Down
7 changes: 3 additions & 4 deletions consensus/examples/engine/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ mod logger;
#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Cli::parse().run().await
fn main() -> anyhow::Result<()> {
Cli::parse().run()
}

/// Tycho network node.
Expand Down Expand Up @@ -47,7 +46,7 @@ struct Cli {
}

impl Cli {
async fn run(self) -> anyhow::Result<()> {
fn run(self) -> anyhow::Result<()> {
let fun = if self.flame {
logger::flame
} else {
Expand Down
6 changes: 2 additions & 4 deletions consensus/src/dag/anchor_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ impl AnchorStage {
// reproducible global coin
let leader_index = rand_pcg::Pcg32::seed_from_u64(anchor_candidate_round as u64)
.gen_range(0..leader_peers.len());
let Some(leader) = leader_peers
let leader = leader_peers
.iter()
.nth(leader_index)
.map(|(peer_id, _)| peer_id)
else {
panic!("selecting a leader from an empty validator set")
};
.expect("selecting a leader from an empty validator set");
// the leader cannot produce three points in a row, so we have an undefined leader,
// rather than an intentional leaderless support round - all represented by `None`
if !current_peers.contains_key(leader) {
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl Dag {
// better try later than wait now if some point is still downloading
.filter_map(|version| version.clone().now_or_never())
// take any suitable
.find_map(move |(dag_point, _)| dag_point.into_valid())
.find_map(move |dag_point| dag_point.into_valid())
})
.flatten()
{
Expand Down Expand Up @@ -169,7 +169,7 @@ impl Dag {
};
let anchor = anchor_round
.view(leader, |loc| {
let (dag_point, _) = loc
let dag_point = loc
.versions()
.get(anchor_digest)
.expect("anchor proof is not linked to anchor, validation broken")
Expand Down Expand Up @@ -200,7 +200,7 @@ impl Dag {
fn drop_tail(&self, anchor_at: Round) {
if let Some(tail) = anchor_at.0.checked_sub(MempoolConfig::COMMIT_DEPTH as u32) {
let mut rounds = self.rounds.lock();
*rounds = rounds.split_off(&Round(tail));
rounds.retain(|k, _| k.0 >= tail);
};
}

Expand Down
88 changes: 52 additions & 36 deletions consensus/src/dag/dag_location.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use std::collections::{btree_map, BTreeMap};
use std::future::Future;
use std::ops::RangeInclusive;
use std::pin::Pin;
use std::sync::{Arc, OnceLock};
use std::task::{Context, Poll};

use everscale_crypto::ed25519::KeyPair;
use futures_util::FutureExt;
use tokio::sync::mpsc;
use tycho_network::PeerId;
use tycho_util::futures::{JoinTask, Shared};

use crate::models::{DagPoint, Digest, Round, Signature, UnixTime, ValidPoint};
Expand All @@ -30,54 +34,66 @@ pub struct DagLocation {
/// only one of the point versions at current location
/// may become proven by the next round point(s) of a node;
/// even if we marked a proven point as invalid, consensus may ignore our decision
versions: BTreeMap<Digest, Shared<JoinTask<DagPoint>>>,
versions: BTreeMap<Digest, DagPointFuture>,
}

#[derive(Clone)]
pub enum DagPointFuture {
Broadcast(Shared<JoinTask<DagPoint>>),
Download {
task: Shared<JoinTask<DagPoint>>,
dependents: mpsc::UnboundedSender<PeerId>,
},
Local(futures_util::future::Ready<DagPoint>),
}

impl DagPointFuture {
pub fn add_depender(&self, dependent: &PeerId) {
if let Self::Download { dependents, .. } = self {
// receiver is dropped upon completion
_ = dependents.send(*dependent);
}
}
}

impl Future for DagPointFuture {
type Output = DagPoint;

#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match &mut *self {
Self::Broadcast(task) | Self::Download { task, .. } => match task.poll_unpin(cx) {
Poll::Ready((dag_point, _)) => Poll::Ready(dag_point),
Poll::Pending => Poll::Pending,
},
Self::Local(ready) => ready.poll_unpin(cx),
}
}
}

impl DagLocation {
pub fn get_or_init<I, F>(&mut self, digest: &Digest, init: I) -> Shared<JoinTask<DagPoint>>
// point that is validated depends on other equivocated points futures (if any)
// in the same location, so need to keep order of futures' completion;
// to make signature we are interested in the single validated point only
// (others are at least suspicious and cannot be signed)
pub fn get_or_init<F>(&mut self, digest: &Digest, init: F) -> &DagPointFuture
where
I: FnOnce() -> F,
F: Future<Output = DagPoint> + Send + 'static,
F: FnOnce(&InclusionState) -> DagPointFuture,
{
match self.versions.entry(digest.clone()) {
btree_map::Entry::Occupied(entry) => entry.get().clone(),
btree_map::Entry::Vacant(entry) => {
let state = self.state.clone();
entry
.insert(Shared::new(JoinTask::new(
init().inspect(move |dag_point| state.init(dag_point)),
)))
.clone()
}
}
self.versions
.entry(digest.clone())
.or_insert_with(|| init(&self.state))
}
pub fn init<I, F>(&mut self, digest: &Digest, init: I) -> Option<&'_ Shared<JoinTask<DagPoint>>>
pub fn init<F>(&mut self, digest: &Digest, init: F) -> Option<&DagPointFuture>
where
I: FnOnce() -> F,
F: Future<Output = DagPoint> + Send + 'static,
F: FnOnce(&InclusionState) -> DagPointFuture,
{
// point that is validated depends on other equivocated points futures (if any)
// in the same location, so order of insertion matches order of futures' completion;
// to make signature we are interested in the first validated point only
// (others are at least suspicious and cannot be signed)
match self.versions.entry(digest.clone()) {
btree_map::Entry::Occupied(_) => None,
btree_map::Entry::Vacant(entry) => {
let state = self.state.clone();
let shared = entry.insert(Shared::new(JoinTask::new({
// Note: cannot sign during validation,
// because current DAG round may advance concurrently
// TODO either leave output as is and reduce locking in 'inclusion state'
// (as single thread consumes them and makes signature),
// or better add global Watch CurrentDagRound (unify with broadcast filter!)
// and sign inside this future (remove futures unordered in collector)
init().inspect(move |dag_point| state.init(dag_point))
})));
Some(shared)
}
btree_map::Entry::Vacant(entry) => Some(entry.insert(init(&self.state))),
}
}
pub fn versions(&self) -> &'_ BTreeMap<Digest, Shared<JoinTask<DagPoint>>> {
pub fn versions(&self) -> &'_ BTreeMap<Digest, DagPointFuture> {
&self.versions
}
pub fn state(&self) -> &'_ InclusionState {
Expand Down
Loading
Loading