Skip to content

Commit

Permalink
Merge branch 'refactor/mempool'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed May 17, 2024
2 parents af28006 + c069a45 commit d1f2c5a
Show file tree
Hide file tree
Showing 27 changed files with 546 additions and 317 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ tycho-util = { workspace = true }

[dev-dependencies]
tycho-collator = { workspace = true, features = ["test"] }
tycho-storage = { workspace = true, features = ["test"] }

[build-dependencies]
anyhow = { workspace = true }
Expand Down
31 changes: 18 additions & 13 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use everscale_crypto::ed25519::KeyPair;
use everscale_types::boc::Boc;
use everscale_types::cell::HashBytes;
use everscale_types::models::ExtInMsgInfo;
use everscale_types::prelude::Load;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedReceiver;
use tycho_block_util::state::ShardStateStuff;
use tycho_consensus::Point;
use tycho_consensus::{InputBufferImpl, Point};
use tycho_network::{DhtClient, OverlayService};
use tycho_util::FastHashSet;

use crate::mempool::types::ExternalMessage;
use crate::mempool::{MempoolAnchor, MempoolAnchorId};
Expand Down Expand Up @@ -127,9 +128,16 @@ impl MempoolAdapterStdImpl {
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<(Arc<Point>, Vec<Arc<Point>>)>();

// TODO receive from outside
let (_externals_tx, externals_rx) = mpsc::unbounded_channel();
let mut engine = tycho_consensus::Engine::new(
key_pair,
&dht_client,
&overlay_service,
sender,
InputBufferImpl::new(externals_rx),
);
tokio::spawn(async move {
let mut engine =
tycho_consensus::Engine::new(key_pair, &dht_client, &overlay_service, sender);
// TODO replace with some sensible init before run
engine.init_with_genesis(&[]).await;
engine.run().await;
Expand All @@ -156,7 +164,8 @@ pub async fn parse_points(
mut rx: UnboundedReceiver<(Arc<Point>, Vec<Arc<Point>>)>,
) {
while let Some((anchor, points)) = rx.recv().await {
let mut external_messages = HashMap::<HashBytes, ExternalMessage>::new();
let mut repr_hashes = FastHashSet::default();
let mut messages = Vec::new();

for point in points {
'message: for message in &point.body.payload {
Expand Down Expand Up @@ -184,16 +193,12 @@ pub async fn parse_points(
}
};

let external_message = ExternalMessage::new(cell.clone(), ext_in_message);
external_messages.insert(*cell.repr_hash(), external_message);
if repr_hashes.insert(*cell.repr_hash()) {
messages.push(Arc::new(ExternalMessage::new(cell.clone(), ext_in_message)));
}
}
}

let messages = external_messages
.into_iter()
.map(|m| Arc::new(m.1))
.collect::<Vec<_>>();

let anchor = Arc::new(MempoolAnchor::new(
anchor.body.location.round.0,
anchor.body.time.as_u64(),
Expand Down
5 changes: 4 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ path = "examples/consensus_node.rs"
[dependencies]
ahash = { workspace = true }
anyhow = { workspace = true }
async-trait = { workspace = true }
bincode = { workspace = true }
bytes = { workspace = true, features = ["serde"] }
dashmap = { workspace = true }
Expand All @@ -39,7 +40,7 @@ tracing-appender = { workspace = true }
# local deps
tycho-network = { workspace = true }
tycho-storage = { workspace = true }
tycho-util = { workspace = true, features = ["test"] }
tycho-util = { workspace = true }

[dev-dependencies]
parking_lot = { workspace = true, features = ["deadlock_detection"] }
Expand All @@ -50,5 +51,7 @@ tikv-jemallocator = { workspace = true, features = [
"background_threads",
]}

tycho-util = { workspace = true, features = ["test"] }

[lints]
workspace = true
10 changes: 8 additions & 2 deletions consensus/examples/consensus_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tokio::sync::mpsc;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter, Layer};
use tycho_consensus::test_utils::drain_anchors;
use tycho_consensus::Engine;
use tycho_consensus::{Engine, InputBufferStub};
use tycho_network::{DhtConfig, NetworkConfig, PeerId, PeerInfo};
use tycho_util::time::now_sec;

Expand Down Expand Up @@ -129,7 +129,13 @@ impl CmdRun {
}

let (committed_tx, committed_rx) = mpsc::unbounded_channel();
let mut engine = Engine::new(key_pair.clone(), &dht_client, &overlay, committed_tx);
let mut engine = Engine::new(
key_pair.clone(),
&dht_client,
&overlay,
committed_tx,
InputBufferStub::new(100, 5),
);
engine.init_with_genesis(all_peers.as_slice()).await;
tokio::spawn(drain_anchors(committed_rx));

Expand Down
6 changes: 4 additions & 2 deletions consensus/src/dag/anchor_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ impl AnchorStage {
else {
panic!("selecting a leader from an empty validator set")
};
// the leader cannot produce three points in a row, so we have an undefined leader,
// rather than an intentional leaderless support round - all represented by `None`
if !current_peers.contains_key(leader) {
return None;
};
Expand All @@ -39,11 +41,11 @@ impl AnchorStage {
// 1 is an anchor candidate (surprisingly, nothing special about this point)
0 | 1 => None,
2 => Some(AnchorStage::Proof {
leader: leader.clone(),
leader: *leader,
is_used: AtomicBool::new(false),
}),
3 => Some(AnchorStage::Trigger {
leader: leader.clone(),
leader: *leader,
is_used: AtomicBool::new(false),
}),
_ => unreachable!(),
Expand Down
115 changes: 57 additions & 58 deletions consensus/src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::dag::anchor_stage::AnchorStage;
use crate::dag::DagRound;
use crate::engine::MempoolConfig;
use crate::intercom::PeerSchedule;
use crate::models::{Point, Round, Ugly, ValidPoint};
use crate::models::{LinkField, Point, Round, Ugly, ValidPoint};

#[derive(Clone)]
pub struct Dag {
Expand All @@ -30,10 +30,10 @@ impl Dag {
pub fn init(&self, dag_round: DagRound) {
let mut rounds = self.rounds.lock();
assert!(rounds.is_empty(), "DAG already initialized");
rounds.insert(dag_round.round().clone(), dag_round);
rounds.insert(dag_round.round(), dag_round);
}

pub fn top(&self, round: &Round, peer_schedule: &PeerSchedule) -> DagRound {
pub fn top(&self, round: Round, peer_schedule: &PeerSchedule) -> DagRound {
let mut rounds = self.rounds.lock();
let mut top = match rounds.last_key_value() {
None => unreachable!("DAG cannot be empty if properly initialized?"),
Expand Down Expand Up @@ -85,8 +85,8 @@ impl Dag {
}
}

async fn latest_trigger(next_round: &DagRound) -> Option<ValidPoint> {
let mut next_dag_round = next_round.clone();
async fn latest_trigger(next_dag_round: &DagRound) -> Option<ValidPoint> {
let mut next_dag_round = next_dag_round.clone();
let mut latest_trigger = None;
while let Some(current_dag_round) = next_dag_round.prev().get() {
if let Some(AnchorStage::Trigger {
Expand Down Expand Up @@ -125,72 +125,71 @@ impl Dag {
) -> Vec<(ValidPoint, DagRound)> {
assert_eq!(
last_trigger.point.prev_id(),
Some(last_trigger.point.anchor_proof_id()),
Some(last_trigger.point.anchor_id(LinkField::Proof)),
"invalid anchor proof link, trigger point must have been invalidated"
);
let mut anchor_stack = Vec::new();
let Some(mut proof) = future_round.vertex_by_proof(last_trigger).await else {
panic!("anchor proof round not in DAG")
};
loop {
let Some(proof_round) = future_round.scan(&proof.point.body.location.round) else {
let Some(proof_round) = future_round.scan(proof.point.body.location.round) else {
panic!("anchor proof round not in DAG while a point from it was received")
};
if proof_round.round() == &MempoolConfig::GENESIS_ROUND {
if proof_round.round() == MempoolConfig::GENESIS_ROUND {
break;
}
match proof_round.anchor_stage() {
Some(AnchorStage::Proof {
ref leader,
ref is_used,
}) => {
assert_eq!(
proof.point.body.location.round,
*proof_round.round(),
"anchor proof round does not match"
);
assert_eq!(
proof.point.body.location.author, leader,
"anchor proof author does not match prescribed by round"
);
let Some(anchor_round) = proof_round.prev().get() else {
break;
};
if is_used.load(Ordering::Relaxed) {
break;
};
let mut proofs = FuturesUnordered::new();
proof_round.view(leader, |loc| {
for (_, version) in loc.versions() {
proofs.push(version.clone())
}
});
let mut anchor = None;
'v: while let Some((proof, _)) = proofs.next().await {
if let Some(valid) = proof.into_valid() {
let Some(valid) = proof_round.vertex_by_proof(&valid).await else {
panic!("anchor proof is not linked to anchor, validation broken")
};
_ = anchor.insert(valid);
is_used.store(true, Ordering::Relaxed);
break 'v;
}
}
let anchor = anchor
.expect("any anchor proof points to anchor point, validation is broken");
anchor_stack.push((anchor.clone(), anchor_round.clone()));

let Some(next_proof) = proof_round
.valid_point(&anchor.point.anchor_proof_id())
.await
else {
break;
let Some(AnchorStage::Proof {
ref leader,
ref is_used,
}) = proof_round.anchor_stage()
else {
panic!("anchor proof round is not expected, validation is broken")
};
assert_eq!(
proof.point.body.location.round,
proof_round.round(),
"anchor proof round does not match"
);
assert_eq!(
proof.point.body.location.author, leader,
"anchor proof author does not match prescribed by round"
);
let Some(anchor_round) = proof_round.prev().get() else {
break;
};
if is_used.load(Ordering::Relaxed) {
break;
};
let mut proofs = FuturesUnordered::new();
proof_round.view(leader, |loc| {
for (_, version) in loc.versions() {
proofs.push(version.clone())
}
});
let mut anchor = None;
'v: while let Some((proof, _)) = proofs.next().await {
if let Some(valid) = proof.into_valid() {
let Some(valid) = proof_round.vertex_by_proof(&valid).await else {
panic!("anchor proof is not linked to anchor, validation broken")
};
proof = next_proof;
future_round = anchor_round;
_ = anchor.insert(valid);
is_used.store(true, Ordering::Relaxed);
break 'v;
}
_ => panic!("anchor proof round is not expected, validation is broken"),
}
let anchor =
anchor.expect("any anchor proof points to anchor point, validation is broken");
anchor_stack.push((anchor.clone(), anchor_round.clone()));

let Some(next_proof) = proof_round
.valid_point(&anchor.point.anchor_id(LinkField::Proof))
.await
else {
break;
};
proof = next_proof;
future_round = anchor_round;
}
anchor_stack
}
Expand All @@ -210,7 +209,7 @@ impl Dag {
anchor_round: &DagRound, // r+1
) -> Vec<Arc<Point>> {
assert_eq!(
*anchor_round.round(),
anchor_round.round(),
anchor.body.location.round,
"passed anchor round does not match anchor point's round"
);
Expand Down
10 changes: 5 additions & 5 deletions consensus/src/dag/dag_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl InclusionState {
None => assert!(false, "Coding error: own point is not trusted"),
Some(valid) => {
_ = signed.set(Ok(Signed {
at: valid.point.body.location.round.clone(),
at: valid.point.body.location.round,
with: valid.point.signature.clone(),
}))
}
Expand All @@ -132,9 +132,9 @@ impl InclusionState {
pub fn signed(&self) -> Option<&'_ Result<Signed, ()>> {
self.0.get()?.signed.get()
}
pub fn signed_point(&self, at: &Round) -> Option<&'_ ValidPoint> {
pub fn signed_point(&self, at: Round) -> Option<&'_ ValidPoint> {
let signable = self.0.get()?;
if &signable.signed.get()?.as_ref().ok()?.at == at {
if signable.signed.get()?.as_ref().ok()?.at == at {
signable.first_completed.valid()
} else {
None
Expand Down Expand Up @@ -168,7 +168,7 @@ pub struct Signed {
impl Signable {
pub fn sign(
&self,
at: &Round,
at: Round,
key_pair: Option<&KeyPair>, // same round for own point and next round for other's
time_range: RangeInclusive<UnixTime>,
) -> bool {
Expand All @@ -178,7 +178,7 @@ impl Signable {
_ = self.signed.get_or_init(|| {
this_call_signed = true;
Ok(Signed {
at: at.clone(),
at,
with: Signature::new(key_pair, &valid.point.digest),
})
});
Expand Down
Loading

0 comments on commit d1f2c5a

Please sign in to comment.