diff --git a/Cargo.lock b/Cargo.lock index cf9c8aaa7..57276288a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2638,6 +2638,7 @@ version = "0.0.1" dependencies = [ "ahash", "anyhow", + "async-trait", "bincode", "bytes", "clap", diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 377fcce7c..145fb2467 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -49,6 +49,7 @@ tycho-util = { workspace = true } [dev-dependencies] tycho-collator = { workspace = true, features = ["test"] } +tycho-storage = { workspace = true, features = ["test"] } [build-dependencies] anyhow = { workspace = true } diff --git a/collator/src/mempool/mempool_adapter.rs b/collator/src/mempool/mempool_adapter.rs index d01e8eec3..78364931b 100644 --- a/collator/src/mempool/mempool_adapter.rs +++ b/collator/src/mempool/mempool_adapter.rs @@ -1,18 +1,19 @@ -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; use everscale_crypto::ed25519::KeyPair; use everscale_types::boc::Boc; -use everscale_types::cell::HashBytes; use everscale_types::models::ExtInMsgInfo; use everscale_types::prelude::Load; use parking_lot::RwLock; +use tokio::sync::mpsc; use tokio::sync::mpsc::UnboundedReceiver; use tycho_block_util::state::ShardStateStuff; -use tycho_consensus::Point; +use tycho_consensus::{InputBufferImpl, Point}; use tycho_network::{DhtClient, OverlayService}; +use tycho_util::FastHashSet; use crate::mempool::types::ExternalMessage; use crate::mempool::{MempoolAnchor, MempoolAnchorId}; @@ -127,9 +128,16 @@ impl MempoolAdapterStdImpl { let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(Arc, Vec>)>(); + // TODO receive from outside + let (_externals_tx, externals_rx) = mpsc::unbounded_channel(); + let mut engine = tycho_consensus::Engine::new( + key_pair, + &dht_client, + &overlay_service, + sender, + InputBufferImpl::new(externals_rx), + ); tokio::spawn(async move { - let mut engine = - tycho_consensus::Engine::new(key_pair, &dht_client, &overlay_service, sender); // TODO replace with some sensible init before run engine.init_with_genesis(&[]).await; engine.run().await; @@ -156,7 +164,8 @@ pub async fn parse_points( mut rx: UnboundedReceiver<(Arc, Vec>)>, ) { while let Some((anchor, points)) = rx.recv().await { - let mut external_messages = HashMap::::new(); + let mut repr_hashes = FastHashSet::default(); + let mut messages = Vec::new(); for point in points { 'message: for message in &point.body.payload { @@ -184,16 +193,12 @@ pub async fn parse_points( } }; - let external_message = ExternalMessage::new(cell.clone(), ext_in_message); - external_messages.insert(*cell.repr_hash(), external_message); + if repr_hashes.insert(*cell.repr_hash()) { + messages.push(Arc::new(ExternalMessage::new(cell.clone(), ext_in_message))); + } } } - let messages = external_messages - .into_iter() - .map(|m| Arc::new(m.1)) - .collect::>(); - let anchor = Arc::new(MempoolAnchor::new( anchor.body.location.round.0, anchor.body.time.as_u64(), diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index 779116800..f5975048e 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -15,6 +15,7 @@ path = "examples/consensus_node.rs" [dependencies] ahash = { workspace = true } anyhow = { workspace = true } +async-trait = { workspace = true } bincode = { workspace = true } bytes = { workspace = true, features = ["serde"] } dashmap = { workspace = true } @@ -39,7 +40,7 @@ tracing-appender = { workspace = true } # local deps tycho-network = { workspace = true } tycho-storage = { workspace = true } -tycho-util = { workspace = true, features = ["test"] } +tycho-util = { workspace = true } [dev-dependencies] parking_lot = { workspace = true, features = ["deadlock_detection"] } @@ -50,5 +51,7 @@ tikv-jemallocator = { workspace = true, features = [ "background_threads", ]} +tycho-util = { workspace = true, features = ["test"] } + [lints] workspace = true diff --git a/consensus/examples/consensus_node.rs b/consensus/examples/consensus_node.rs index a8b70b64c..e9c56c21d 100644 --- a/consensus/examples/consensus_node.rs +++ b/consensus/examples/consensus_node.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::{fmt, EnvFilter, Layer}; use tycho_consensus::test_utils::drain_anchors; -use tycho_consensus::Engine; +use tycho_consensus::{Engine, InputBufferStub}; use tycho_network::{DhtConfig, NetworkConfig, PeerId, PeerInfo}; use tycho_util::time::now_sec; @@ -129,7 +129,13 @@ impl CmdRun { } let (committed_tx, committed_rx) = mpsc::unbounded_channel(); - let mut engine = Engine::new(key_pair.clone(), &dht_client, &overlay, committed_tx); + let mut engine = Engine::new( + key_pair.clone(), + &dht_client, + &overlay, + committed_tx, + InputBufferStub::new(100, 5), + ); engine.init_with_genesis(all_peers.as_slice()).await; tokio::spawn(drain_anchors(committed_rx)); diff --git a/consensus/src/dag/anchor_stage.rs b/consensus/src/dag/anchor_stage.rs index a3b7efcd3..776922c12 100644 --- a/consensus/src/dag/anchor_stage.rs +++ b/consensus/src/dag/anchor_stage.rs @@ -31,6 +31,8 @@ impl AnchorStage { else { panic!("selecting a leader from an empty validator set") }; + // the leader cannot produce three points in a row, so we have an undefined leader, + // rather than an intentional leaderless support round - all represented by `None` if !current_peers.contains_key(leader) { return None; }; @@ -39,11 +41,11 @@ impl AnchorStage { // 1 is an anchor candidate (surprisingly, nothing special about this point) 0 | 1 => None, 2 => Some(AnchorStage::Proof { - leader: leader.clone(), + leader: *leader, is_used: AtomicBool::new(false), }), 3 => Some(AnchorStage::Trigger { - leader: leader.clone(), + leader: *leader, is_used: AtomicBool::new(false), }), _ => unreachable!(), diff --git a/consensus/src/dag/dag.rs b/consensus/src/dag/dag.rs index dc7846c82..45d46f729 100644 --- a/consensus/src/dag/dag.rs +++ b/consensus/src/dag/dag.rs @@ -12,7 +12,7 @@ use crate::dag::anchor_stage::AnchorStage; use crate::dag::DagRound; use crate::engine::MempoolConfig; use crate::intercom::PeerSchedule; -use crate::models::{Point, Round, Ugly, ValidPoint}; +use crate::models::{LinkField, Point, Round, Ugly, ValidPoint}; #[derive(Clone)] pub struct Dag { @@ -30,10 +30,10 @@ impl Dag { pub fn init(&self, dag_round: DagRound) { let mut rounds = self.rounds.lock(); assert!(rounds.is_empty(), "DAG already initialized"); - rounds.insert(dag_round.round().clone(), dag_round); + rounds.insert(dag_round.round(), dag_round); } - pub fn top(&self, round: &Round, peer_schedule: &PeerSchedule) -> DagRound { + pub fn top(&self, round: Round, peer_schedule: &PeerSchedule) -> DagRound { let mut rounds = self.rounds.lock(); let mut top = match rounds.last_key_value() { None => unreachable!("DAG cannot be empty if properly initialized?"), @@ -85,8 +85,8 @@ impl Dag { } } - async fn latest_trigger(next_round: &DagRound) -> Option { - let mut next_dag_round = next_round.clone(); + async fn latest_trigger(next_dag_round: &DagRound) -> Option { + let mut next_dag_round = next_dag_round.clone(); let mut latest_trigger = None; while let Some(current_dag_round) = next_dag_round.prev().get() { if let Some(AnchorStage::Trigger { @@ -125,7 +125,7 @@ impl Dag { ) -> Vec<(ValidPoint, DagRound)> { assert_eq!( last_trigger.point.prev_id(), - Some(last_trigger.point.anchor_proof_id()), + Some(last_trigger.point.anchor_id(LinkField::Proof)), "invalid anchor proof link, trigger point must have been invalidated" ); let mut anchor_stack = Vec::new(); @@ -133,64 +133,63 @@ impl Dag { panic!("anchor proof round not in DAG") }; loop { - let Some(proof_round) = future_round.scan(&proof.point.body.location.round) else { + let Some(proof_round) = future_round.scan(proof.point.body.location.round) else { panic!("anchor proof round not in DAG while a point from it was received") }; - if proof_round.round() == &MempoolConfig::GENESIS_ROUND { + if proof_round.round() == MempoolConfig::GENESIS_ROUND { break; } - match proof_round.anchor_stage() { - Some(AnchorStage::Proof { - ref leader, - ref is_used, - }) => { - assert_eq!( - proof.point.body.location.round, - *proof_round.round(), - "anchor proof round does not match" - ); - assert_eq!( - proof.point.body.location.author, leader, - "anchor proof author does not match prescribed by round" - ); - let Some(anchor_round) = proof_round.prev().get() else { - break; - }; - if is_used.load(Ordering::Relaxed) { - break; - }; - let mut proofs = FuturesUnordered::new(); - proof_round.view(leader, |loc| { - for (_, version) in loc.versions() { - proofs.push(version.clone()) - } - }); - let mut anchor = None; - 'v: while let Some((proof, _)) = proofs.next().await { - if let Some(valid) = proof.into_valid() { - let Some(valid) = proof_round.vertex_by_proof(&valid).await else { - panic!("anchor proof is not linked to anchor, validation broken") - }; - _ = anchor.insert(valid); - is_used.store(true, Ordering::Relaxed); - break 'v; - } - } - let anchor = anchor - .expect("any anchor proof points to anchor point, validation is broken"); - anchor_stack.push((anchor.clone(), anchor_round.clone())); - - let Some(next_proof) = proof_round - .valid_point(&anchor.point.anchor_proof_id()) - .await - else { - break; + let Some(AnchorStage::Proof { + ref leader, + ref is_used, + }) = proof_round.anchor_stage() + else { + panic!("anchor proof round is not expected, validation is broken") + }; + assert_eq!( + proof.point.body.location.round, + proof_round.round(), + "anchor proof round does not match" + ); + assert_eq!( + proof.point.body.location.author, leader, + "anchor proof author does not match prescribed by round" + ); + let Some(anchor_round) = proof_round.prev().get() else { + break; + }; + if is_used.load(Ordering::Relaxed) { + break; + }; + let mut proofs = FuturesUnordered::new(); + proof_round.view(leader, |loc| { + for (_, version) in loc.versions() { + proofs.push(version.clone()) + } + }); + let mut anchor = None; + 'v: while let Some((proof, _)) = proofs.next().await { + if let Some(valid) = proof.into_valid() { + let Some(valid) = proof_round.vertex_by_proof(&valid).await else { + panic!("anchor proof is not linked to anchor, validation broken") }; - proof = next_proof; - future_round = anchor_round; + _ = anchor.insert(valid); + is_used.store(true, Ordering::Relaxed); + break 'v; } - _ => panic!("anchor proof round is not expected, validation is broken"), } + let anchor = + anchor.expect("any anchor proof points to anchor point, validation is broken"); + anchor_stack.push((anchor.clone(), anchor_round.clone())); + + let Some(next_proof) = proof_round + .valid_point(&anchor.point.anchor_id(LinkField::Proof)) + .await + else { + break; + }; + proof = next_proof; + future_round = anchor_round; } anchor_stack } @@ -210,7 +209,7 @@ impl Dag { anchor_round: &DagRound, // r+1 ) -> Vec> { assert_eq!( - *anchor_round.round(), + anchor_round.round(), anchor.body.location.round, "passed anchor round does not match anchor point's round" ); diff --git a/consensus/src/dag/dag_location.rs b/consensus/src/dag/dag_location.rs index 2ab029c71..20b5ed6a4 100644 --- a/consensus/src/dag/dag_location.rs +++ b/consensus/src/dag/dag_location.rs @@ -109,7 +109,7 @@ impl InclusionState { None => assert!(false, "Coding error: own point is not trusted"), Some(valid) => { _ = signed.set(Ok(Signed { - at: valid.point.body.location.round.clone(), + at: valid.point.body.location.round, with: valid.point.signature.clone(), })) } @@ -132,9 +132,9 @@ impl InclusionState { pub fn signed(&self) -> Option<&'_ Result> { self.0.get()?.signed.get() } - pub fn signed_point(&self, at: &Round) -> Option<&'_ ValidPoint> { + pub fn signed_point(&self, at: Round) -> Option<&'_ ValidPoint> { let signable = self.0.get()?; - if &signable.signed.get()?.as_ref().ok()?.at == at { + if signable.signed.get()?.as_ref().ok()?.at == at { signable.first_completed.valid() } else { None @@ -168,7 +168,7 @@ pub struct Signed { impl Signable { pub fn sign( &self, - at: &Round, + at: Round, key_pair: Option<&KeyPair>, // same round for own point and next round for other's time_range: RangeInclusive, ) -> bool { @@ -178,7 +178,7 @@ impl Signable { _ = self.signed.get_or_init(|| { this_call_signed = true; Ok(Signed { - at: at.clone(), + at, with: Signature::new(key_pair, &valid.point.digest), }) }); diff --git a/consensus/src/dag/dag_round.rs b/consensus/src/dag/dag_round.rs index 3d7fcbf46..9190e75ce 100644 --- a/consensus/src/dag/dag_round.rs +++ b/consensus/src/dag/dag_round.rs @@ -55,13 +55,13 @@ impl DagRound { } pub fn new(round: Round, peer_schedule: &PeerSchedule, prev: WeakDagRound) -> Self { - let peers = peer_schedule.peers_for(&round); + let peers = peer_schedule.peers_for(round); let locations = FastDashMap::with_capacity_and_hasher(peers.len(), Default::default()); Self(Arc::new(DagRoundInner { round, node_count: NodeCount::try_from(peers.len()) .expect(&format!("peer schedule updated for {round:?}")), - key_pair: peer_schedule.local_keys(&round), + key_pair: peer_schedule.local_keys(round), anchor_stage: AnchorStage::of(round, peer_schedule), locations, prev, @@ -70,16 +70,16 @@ impl DagRound { pub fn next(&self, peer_schedule: &PeerSchedule) -> Self { let next_round = self.round().next(); - let peers = peer_schedule.peers_for(&next_round); + let peers = peer_schedule.peers_for(next_round); let locations = FastDashMap::with_capacity_and_hasher(peers.len(), Default::default()); Self(Arc::new(DagRoundInner { round: next_round, node_count: NodeCount::try_from(peers.len()) .expect(&format!("peer schedule updated for {next_round:?}")), - key_pair: peer_schedule.local_keys(&next_round), + key_pair: peer_schedule.local_keys(next_round), anchor_stage: AnchorStage::of(next_round, peer_schedule), locations, - prev: self.as_weak(), + prev: self.to_weak(), })) } @@ -96,12 +96,12 @@ impl DagRound { })) } - pub fn round(&self) -> &'_ Round { - &self.0.round + pub fn round(&self) -> Round { + self.0.round } - pub fn node_count(&self) -> &'_ NodeCount { - &self.0.node_count + pub fn node_count(&self) -> NodeCount { + self.0.node_count } pub fn key_pair(&self) -> Option<&'_ KeyPair> { @@ -141,14 +141,14 @@ impl DagRound { &self.0.prev } - pub fn as_weak(&self) -> WeakDagRound { + pub fn to_weak(&self) -> WeakDagRound { WeakDagRound(Arc::downgrade(&self.0)) } pub async fn vertex_by_proof(&self, proof: &ValidPoint) -> Option { match proof.point.body.proof { Some(ref proven) => { - let dag_round = self.scan(&proof.point.body.location.round.prev())?; + let dag_round = self.scan(proof.point.body.location.round.prev())?; dag_round .valid_point_exact(&proof.point.body.location.author, &proven.digest) .await @@ -158,7 +158,7 @@ impl DagRound { } pub async fn valid_point(&self, point_id: &PointId) -> Option { - match self.scan(&point_id.location.round) { + match self.scan(point_id.location.round) { Some(linked) => { linked .valid_point_exact(&point_id.location.author, &point_id.digest) @@ -178,7 +178,7 @@ impl DagRound { point: &Arc, downloader: &Downloader, ) -> Option> { - self.scan(&point.body.location.round) + self.scan(point.body.location.round) .and_then(|linked| linked.add_exact(&point, downloader)) } @@ -187,10 +187,10 @@ impl DagRound { point: &Arc, downloader: &Downloader, ) -> Option> { - if &point.body.location.round != self.round() { + if point.body.location.round != self.round() { panic!("Coding error: dag round mismatches point round on add") } - let dag_round = self.as_weak(); + let dag_round = self.to_weak(); let digest = &point.digest; self.edit(&point.body.location.author, |loc| { let state = loc.state().clone(); @@ -211,7 +211,7 @@ impl DagRound { if !Verifier::verify(point, peer_schedule).is_ok() { panic!("Coding error: malformed point") } - let point = Verifier::validate(point.clone(), self.as_weak(), downloader.clone()).await; + let point = Verifier::validate(point.clone(), self.to_weak(), downloader.clone()).await; if point.trusted().is_none() { panic!("Coding error: not a trusted point") } @@ -219,7 +219,7 @@ impl DagRound { if let Some(signable) = state.signable() { signable.sign( self.round(), - peer_schedule.local_keys(&self.round().next()).as_deref(), + peer_schedule.local_keys(self.round().next()).as_deref(), MempoolConfig::sign_time_range(), ); } @@ -236,13 +236,13 @@ impl DagRound { if dag_point.valid().is_some() { panic!("Coding error: failed to insert valid point as invalid") } - self.scan(&dag_point.location().round) + self.scan(dag_point.location().round) .map(|linked| linked.insert_exact(dag_point)) .flatten() } fn insert_exact(&self, dag_point: &DagPoint) -> Option> { - if &dag_point.location().round != self.round() { + if dag_point.location().round != self.round() { panic!("Coding error: dag round mismatches point round on insert") } self.edit(&dag_point.location().author, |loc| { @@ -254,7 +254,7 @@ impl DagRound { }) } - pub fn scan(&self, round: &Round) -> Option { + pub fn scan(&self, round: Round) -> Option { assert!( round <= self.round(), "Coding error: cannot scan DAG rounds chain for a future round" diff --git a/consensus/src/dag/producer.rs b/consensus/src/dag/producer.rs index 69b2c79a3..72d46b445 100644 --- a/consensus/src/dag/producer.rs +++ b/consensus/src/dag/producer.rs @@ -6,7 +6,9 @@ use tycho_network::PeerId; use crate::dag::anchor_stage::AnchorStage; use crate::dag::DagRound; -use crate::models::{Link, Location, Point, PointBody, PrevPoint, Round, Through, UnixTime}; +use crate::models::{ + Link, LinkField, Location, Point, PointBody, PrevPoint, Round, Through, UnixTime, +}; pub struct Producer; @@ -30,12 +32,22 @@ impl Producer { }; let includes = Self::includes(&finished_round); let mut anchor_trigger = - Self::link_from_includes(&local_id, ¤t_round, &includes, true); + Self::link_from_includes(&local_id, ¤t_round, &includes, LinkField::Trigger); let mut anchor_proof = - Self::link_from_includes(&local_id, ¤t_round, &includes, false); + Self::link_from_includes(&local_id, ¤t_round, &includes, LinkField::Proof); let witness = Self::witness(&finished_round); - Self::update_link_from_witness(&mut anchor_trigger, finished_round.round(), &witness, true); - Self::update_link_from_witness(&mut anchor_proof, finished_round.round(), &witness, false); + Self::update_link_from_witness( + &mut anchor_trigger, + current_round.round(), + &witness, + LinkField::Trigger, + ); + Self::update_link_from_witness( + &mut anchor_proof, + current_round.round(), + &witness, + LinkField::Proof, + ); let time = Self::get_time( &finished_round, &local_id, @@ -67,8 +79,8 @@ impl Producer { Some(Point::new(key_pair, PointBody { location: Location { - round: current_round.round().clone(), - author: local_id.clone(), + round: current_round.round(), + author: local_id, }, time, payload, @@ -118,87 +130,67 @@ impl Producer { fn link_from_includes( local_id: &PeerId, current_round: &DagRound, - includes: &Vec>, - is_for_trigger: bool, + includes: &[Arc], + link_field: LinkField, ) -> Link { - match current_round.anchor_stage() { - Some(AnchorStage::Trigger { leader, .. }) if is_for_trigger && leader == local_id => { - Link::ToSelf - } - Some(AnchorStage::Proof { leader, .. }) if !is_for_trigger && leader == local_id => { - Link::ToSelf + use AnchorStage::*; + + match (current_round.anchor_stage(), link_field) { + (Some(Trigger { leader, .. }), LinkField::Trigger) + | (Some(Proof { leader, .. }), LinkField::Proof) + if leader == local_id => + { + return Link::ToSelf; } - _ => { - let point = includes - .iter() - .max_by_key(|point| { - if is_for_trigger { - point.anchor_trigger_round() - } else { - point.anchor_proof_round() - } - }) - .expect("non-empty list of includes for own point"); - if point.body.location.round == current_round.round().prev() - && ((is_for_trigger && point.body.anchor_trigger == Link::ToSelf) - || (!is_for_trigger && point.body.anchor_proof == Link::ToSelf)) - { - Link::Direct(Through::Includes(point.body.location.author.clone())) - } else { - let to = if is_for_trigger { - point.anchor_trigger_id() - } else { - point.anchor_proof_id() - }; - Link::Indirect { - to, - path: Through::Includes(point.body.location.author.clone()), - } - } + _ => {} + } + + let point = includes + .iter() + .max_by_key(|point| point.anchor_round(link_field)) + .expect("non-empty list of includes for own point"); + + if point.body.location.round == current_round.round().prev() + && point.anchor_link(link_field) == &Link::ToSelf + { + Link::Direct(Through::Includes(point.body.location.author)) + } else { + Link::Indirect { + to: point.anchor_id(link_field), + path: Through::Includes(point.body.location.author), } } } fn update_link_from_witness( link: &mut Link, - finished_round: &Round, - witness: &Vec>, - is_for_trigger: bool, + current_round: Round, + witness: &[Arc], + link_field: LinkField, ) { let link_round = match link { Link::ToSelf | Link::Direct(_) => return, Link::Indirect { to, .. } => to.location.round, }; - fn last_round(point: &Point, is_for_trigger: bool) -> Round { - if is_for_trigger { - point.anchor_trigger_round() - } else { - point.anchor_proof_round() - } - } + let Some(point) = witness .iter() - .filter(|point| last_round(&point, is_for_trigger) > link_round) - .max_by_key(|point| last_round(&point, is_for_trigger)) + .filter(|point| point.anchor_round(link_field) > link_round) + .max_by_key(|point| point.anchor_round(link_field)) else { return; }; - if point.body.location.round == finished_round.prev() - && ((is_for_trigger && point.body.anchor_trigger == Link::ToSelf) - || (!is_for_trigger && point.body.anchor_proof == Link::ToSelf)) + + if point.body.location.round == current_round.prev().prev() + && point.anchor_link(link_field) == &Link::ToSelf { - *link = Link::Direct(Through::Witness(point.body.location.author)) + *link = Link::Direct(Through::Witness(point.body.location.author)); } else { - let to = if is_for_trigger { - point.anchor_trigger_id() - } else { - point.anchor_proof_id() - }; *link = Link::Indirect { - to, + to: point.anchor_id(link_field), path: Through::Witness(point.body.location.author), - } - }; + }; + } } async fn get_time( @@ -206,16 +198,16 @@ impl Producer { local_id: &PeerId, anchor_proof: &Link, prev_point: Option<&PrevPoint>, - includes: &Vec>, - witness: &Vec>, + includes: &[Arc], + witness: &[Arc], ) -> UnixTime { let mut time = UnixTime::now(); if let Some(prev_point) = prev_point { if let Some(valid) = finished_round - .valid_point_exact(&local_id, &prev_point.digest) + .valid_point_exact(local_id, &prev_point.digest) .await { - time = valid.point.body.time.clone().max(time); + time = valid.point.body.time.max(time); } } match anchor_proof { @@ -229,14 +221,14 @@ impl Producer { .iter() .find(|point| point.body.location.author == peer_id) { - time = point.body.time.clone().max(time); + time = point.body.time.max(time); } } Link::Indirect { to, .. } => { // it's sufficient to check prev point - it can't have newer anchor proof if prev_point.is_none() { if let Some(valid) = finished_round.valid_point(&to).await { - time = valid.point.body.time.clone().max(time); + time = valid.point.body.time.max(time); } else { panic!("last anchor proof must stay in DAG until its payload is committed") } diff --git a/consensus/src/dag/verifier.rs b/consensus/src/dag/verifier.rs index e2a10f229..32b4f0080 100644 --- a/consensus/src/dag/verifier.rs +++ b/consensus/src/dag/verifier.rs @@ -9,7 +9,7 @@ use crate::dag::{DagRound, WeakDagRound}; use crate::engine::MempoolConfig; use crate::intercom::{Downloader, PeerSchedule}; use crate::models::{ - DagPoint, Digest, Link, Location, NodeCount, Point, PointId, Ugly, ValidPoint, + DagPoint, Digest, Link, LinkField, Location, NodeCount, Point, PointId, Ugly, ValidPoint, }; // Note on equivocation. @@ -59,7 +59,7 @@ impl Verifier { return DagPoint::NotExists(Arc::new(point.id())); }; // TODO upgrade Weak whenever used to let Dag Round drop if some future hangs up for long - if &point.body.location.round != r_0.round() { + if point.body.location.round != r_0.round() { panic!("Coding error: dag round mismatches point round") } @@ -115,25 +115,25 @@ impl Verifier { dependencies: &mut JoinSet, ) -> bool { let mut links = vec![ - (point.anchor_proof_id(), false), - (point.anchor_trigger_id(), true), + (point.anchor_id(LinkField::Proof), LinkField::Proof), + (point.anchor_id(LinkField::Trigger), LinkField::Trigger), ]; let mut linked_with_round = Vec::with_capacity(2); let mut dag_round = dag_round.clone(); while !links.is_empty() { - links.retain(|(linked, is_trigger)| { - let found = &linked.location.round == dag_round.round(); + links.retain(|(linked, link_field)| { + let found = linked.location.round == dag_round.round(); if found { - match (&dag_round.anchor_stage(), is_trigger) { + match (&dag_round.anchor_stage(), link_field) { // AnchorStage::Candidate(_) requires nothing special - (Some(AnchorStage::Proof { leader, .. }), false) + (Some(AnchorStage::Proof { leader, .. }), LinkField::Proof) if leader == linked.location.author => {} - (Some(AnchorStage::Trigger { leader, .. }), true) + (Some(AnchorStage::Trigger { leader, .. }), LinkField::Trigger) if leader == linked.location.author => {} _ => return false, // link not to round's leader } linked_with_round.push(( - linked.location.author.clone(), + linked.location.author, linked.digest.clone(), dag_round.clone(), )); @@ -155,7 +155,7 @@ impl Verifier { // while we need to get invalid ones to blame current point for (author, digest, dag_round) in linked_with_round { // skip self links - if dag_round.round() < &point.body.location.round { + if dag_round.round() < point.body.location.round { // TODO will add the same point from direct dependencies twice, // we can do better but nothing terrible Self::add_dependency( @@ -184,12 +184,12 @@ impl Verifier { loc.get_or_init(digest, move || { let point_id = PointId { location: Location { - author: author.clone(), - round: round.round().clone(), + author: *author, + round: round.round(), }, digest: digest.clone(), }; - downloader.run(point_id, round.as_weak(), dependant.clone()) + downloader.run(point_id, round.to_weak(), dependant.clone()) }) }); dependencies.spawn(shared.map(|(dag_point, _)| dag_point)); @@ -221,7 +221,7 @@ impl Verifier { async fn check_deps(point: &Arc, mut dependencies: JoinSet) -> DagPoint { // point is well-formed if we got here, so point.proof matches point.includes - let proven_vertex = point.body.proof.as_ref().map(|p| &p.digest).clone(); + let proven_vertex = point.body.proof.as_ref().map(|p| &p.digest); let prev_loc = Location { round: point.body.location.round.prev(), author: point.body.location.author, @@ -233,10 +233,10 @@ impl Verifier { // Invalid dependency is the author's fault. let mut is_suspicious = false; // last is meant to be the last among all dependencies - let anchor_trigger_id = point.anchor_trigger_id(); - let anchor_proof_id = point.anchor_proof_id(); - let anchor_trigger_through = point.anchor_trigger_through(); - let anchor_proof_through = point.anchor_proof_through(); + let anchor_trigger_id = point.anchor_id(LinkField::Trigger); + let anchor_proof_id = point.anchor_id(LinkField::Proof); + let anchor_trigger_link_id = point.anchor_link_id(LinkField::Trigger); + let anchor_proof_link_id = point.anchor_link_id(LinkField::Proof); while let Some(res) = dependencies.join_next().await { match res { Ok(DagPoint::Trusted(valid) | DagPoint::Suspicious(valid)) => { @@ -252,19 +252,21 @@ impl Verifier { None => return DagPoint::Invalid(point.clone()), } } // else: valid dependency - if valid.point.anchor_trigger_round() > anchor_trigger_id.location.round - || valid.point.anchor_proof_round() > anchor_proof_id.location.round + if valid.point.anchor_round(LinkField::Trigger) + > anchor_trigger_id.location.round + || valid.point.anchor_round(LinkField::Proof) + > anchor_proof_id.location.round { // did not actualize the chain return DagPoint::Invalid(point.clone()); } let valid_point_id = valid.point.id(); if ({ - valid_point_id == anchor_trigger_through - && valid.point.anchor_trigger_id() != anchor_trigger_id + valid_point_id == anchor_trigger_link_id + && valid.point.anchor_id(LinkField::Trigger) != anchor_trigger_id }) || ({ - valid_point_id == anchor_proof_through - && valid.point.anchor_proof_id() != anchor_proof_id + valid_point_id == anchor_proof_link_id + && valid.point.anchor_id(LinkField::Proof) != anchor_proof_id }) { // path does not lead to destination return DagPoint::Invalid(point.clone()); @@ -339,7 +341,7 @@ impl Verifier { ] = peer_schedule.peers_for_array([ point.body.location.round.prev().prev(), point.body.location.round.prev(), - point.body.location.round.clone(), + point.body.location.round, ]); for (peer_id, _) in point.body.witness.iter() { if !witness_peers.contains_key(peer_id) { diff --git a/consensus/src/engine/engine.rs b/consensus/src/engine/engine.rs index d036c2786..f1a17d39b 100644 --- a/consensus/src/engine/engine.rs +++ b/consensus/src/engine/engine.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use bytes::Bytes; use everscale_crypto::ed25519::KeyPair; use itertools::Itertools; use tokio::sync::mpsc::UnboundedSender; @@ -8,6 +9,7 @@ use tokio::task::{JoinError, JoinSet}; use tycho_network::{DhtClient, OverlayService, PeerId}; use crate::dag::{Dag, DagRound, InclusionState, Producer}; +use crate::engine::input_buffer::InputBuffer; use crate::engine::MempoolConfig; use crate::intercom::{ BroadcastFilter, Broadcaster, BroadcasterSignal, Collector, CollectorSignal, Dispatcher, @@ -28,6 +30,7 @@ pub struct Engine { top_dag_round: Arc>, tasks: JoinSet<()>, // should be JoinSet committed: UnboundedSender<(Arc, Vec>)>, + input_buffer: Box, } impl Engine { @@ -36,6 +39,7 @@ impl Engine { dht_client: &DhtClient, overlay_service: &OverlayService, committed: UnboundedSender<(Arc, Vec>)>, + input_buffer: impl InputBuffer, ) -> Self { let log_id = Arc::new(format!("{:?}", PeerId::from(key_pair.public_key).ugly())); let peer_schedule = Arc::new(PeerSchedule::new(key_pair)); @@ -101,13 +105,14 @@ impl Engine { top_dag_round, tasks, committed, + input_buffer: Box::new(input_buffer), } } pub async fn init_with_genesis(&mut self, next_peers: &[PeerId]) { let genesis = crate::test_utils::genesis(); assert!( - genesis.body.location.round > *self.top_dag_round.read().await.round(), + genesis.body.location.round > self.top_dag_round.read().await.round(), "genesis point round is too low" ); // check only genesis round as it is widely used in point validation. @@ -149,9 +154,23 @@ impl Engine { .top(self.collector.next_round(), &self.peer_schedule); let next_dag_round = self .dag - .top(¤t_dag_round.round().next(), &self.peer_schedule); + .top(current_dag_round.round().next(), &self.peer_schedule); - tracing::info!("{} @ {:?}", self.log_id, current_dag_round.round()); + let produce_with_payload = if produce_own_point { + Some(self.input_buffer.as_mut().fetch(prev_point.is_some()).await) + } else { + None + }; + + tracing::info!( + "{} @ {:?} {}", + self.log_id, + current_dag_round.round(), + produce_with_payload + .as_ref() + .map(|payload| payload.iter().map(|bytes| bytes.len()).sum::() / 1024) + .map_or("no point".to_string(), |kb| format!("payload {kb} KiB")) + ); let (bcaster_ready_tx, bcaster_ready_rx) = mpsc::channel(1); // let this channel unbounded - there won't be many items, but every of them is essential @@ -159,7 +178,7 @@ impl Engine { let (own_point_state_tx, own_point_state_rx) = oneshot::channel(); let bcaster_run = tokio::spawn(Self::bcaster_run( - produce_own_point, + produce_with_payload, self.broadcaster, self.peer_schedule.clone(), self.top_dag_round.clone(), @@ -180,8 +199,8 @@ impl Engine { let bcast_filter_run = { let bcast_filter = self.broadcast_filter.clone(); - let round = current_dag_round.round().clone(); - tokio::spawn(async move { bcast_filter.advance_round(&round) }) + let round = current_dag_round.round(); + tokio::spawn(async move { bcast_filter.advance_round(round) }) }; let collector_run = tokio::spawn(self.collector.run( @@ -209,7 +228,7 @@ impl Engine { } async fn bcaster_run( - produce_own_point: bool, + produce_with_payload: Option>, mut broadcaster: Broadcaster, peer_schedule: Arc, top_dag_round: Arc>, @@ -221,13 +240,14 @@ impl Engine { bcaster_ready_tx: mpsc::Sender, mut collector_signal_rx: mpsc::UnboundedReceiver, ) -> (Broadcaster, Option>) { - if produce_own_point { + if let Some(payload) = produce_with_payload { let new_point = tokio::spawn(Self::produce( current_dag_round, prev_point, peer_schedule.clone(), downloader, own_point_state, + payload, )); // must signal to uploader before start of broadcast let top_dag_round_upd = @@ -271,9 +291,10 @@ impl Engine { peer_schedule: Arc, downloader: Downloader, own_point_state: oneshot::Sender, + payload: Vec, ) -> Option> { if let Some(own_point) = - Producer::new_point(¤t_dag_round, prev_point.as_deref(), vec![]).await + Producer::new_point(¤t_dag_round, prev_point.as_deref(), payload).await { let state = current_dag_round .insert_exact_sign(&own_point, &peer_schedule, &downloader) diff --git a/consensus/src/engine/input_buffer.rs b/consensus/src/engine/input_buffer.rs new file mode 100644 index 000000000..ef7717e30 --- /dev/null +++ b/consensus/src/engine/input_buffer.rs @@ -0,0 +1,201 @@ +use std::collections::VecDeque; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use rand::{thread_rng, RngCore}; +use tokio::sync::{mpsc, Notify}; +use tokio::task::JoinHandle; + +use crate::engine::MempoolConfig; + +#[async_trait] +pub trait InputBuffer: Send + 'static { + /// `only_fresh = false` to repeat the same elements if they are still buffered, + /// use in case last round failed + async fn fetch(&mut self, only_fresh: bool) -> Vec; +} + +pub struct InputBufferImpl { + abort: Arc, + consumer: Option>, +} + +impl InputBufferImpl { + pub fn new(externals: mpsc::UnboundedReceiver) -> Self { + let abort = Arc::new(Notify::new()); + let inner = InputBufferImplInner { + externals, + data: Default::default(), + }; + + Self { + consumer: Some(tokio::spawn(inner.consume(abort.clone()))), + abort, + } + } +} + +impl Drop for InputBufferImpl { + fn drop(&mut self) { + if let Some(handle) = self.consumer.take() { + handle.abort(); + } + } +} + +#[async_trait] +impl InputBuffer for InputBufferImpl { + async fn fetch(&mut self, only_fresh: bool) -> Vec { + self.abort.notify_waiters(); + let handle = self.consumer.take().expect("consumer must be set"); + let mut inner = handle.await.expect("consumer failed"); + + if only_fresh { + inner.data.commit_offset(); + } + let result = inner.data.fetch(); + + self.consumer = Some(tokio::spawn(inner.consume(self.abort.clone()))); + result + } +} + +struct InputBufferImplInner { + externals: mpsc::UnboundedReceiver, + data: InputBufferData, +} + +impl InputBufferImplInner { + async fn consume(mut self, abort: Arc) -> Self { + let mut notified = std::pin::pin!(abort.notified()); + loop { + tokio::select! { + _ = &mut notified => break self, + payload = self.externals.recv() => { + self.data.add(payload.expect("externals input channel to mempool is closed")); + }, + } + } + } +} + +#[derive(Default)] +struct InputBufferData { + data: VecDeque, + data_bytes: usize, + offset_elements: usize, +} + +impl InputBufferData { + fn fetch(&mut self) -> Vec { + let mut taken_bytes = 0; + let result = self + .data + .iter() + .take_while(|elem| { + taken_bytes += elem.len(); + taken_bytes <= MempoolConfig::PAYLOAD_BATCH_BYTES + }) + .cloned() + .collect::>(); + self.offset_elements = result.len(); // overwrite + result + } + + fn add(&mut self, payload: Bytes) { + let payload_bytes = payload.len(); + assert!( + payload_bytes <= MempoolConfig::PAYLOAD_BUFFER_BYTES, + "cannot buffer too large message of {payload_bytes} bytes: \ + increase config value of PAYLOAD_BUFFER_BYTES={} \ + or filter out insanely large messages prior sending them to mempool", + MempoolConfig::PAYLOAD_BUFFER_BYTES + ); + + let max_data_bytes = MempoolConfig::PAYLOAD_BUFFER_BYTES - payload_bytes; + if self.data_bytes > max_data_bytes { + let to_drop = self + .data + .iter() + .take_while(|evicted| { + self.data_bytes = self + .data_bytes + .checked_sub(evicted.len()) + .expect("decrease buffered data size on eviction"); + self.data_bytes > max_data_bytes + }) + .count(); + + self.offset_elements = self.offset_elements.saturating_sub(to_drop); + _ = self.data.drain(..to_drop); + } + + self.data_bytes += payload_bytes; + self.data.push_back(payload); + } + + fn commit_offset(&mut self) { + let committed_bytes: usize = self + .data + .drain(..self.offset_elements) + .map(|comitted_bytes| comitted_bytes.len()) + .sum(); + + self.update_capacity(); + + self.data_bytes = self + .data_bytes + .checked_sub(committed_bytes) + .expect("decrease buffered data size on commit offset"); + + self.offset_elements = 0; + } + + /// Ensures that the capacity is not too large. + fn update_capacity(&mut self) { + let len = self.data.len(); + + // because reallocation on adding elements doubles the capacity + if self.data.capacity() >= len * 4 { + self.data.shrink_to(len / 2); + } + } +} + +pub struct InputBufferStub { + fetch_count: usize, + steps_until_full: usize, + fetches_in_step: usize, +} + +impl InputBufferStub { + /// External message is limited by 64 KiB + const EXTERNAL_MSG_MAX_BYTES: usize = 64 * 1024; + + pub fn new(fetches_in_step: usize, steps_until_full: usize) -> Self { + Self { + fetch_count: 0, + steps_until_full, + fetches_in_step, + } + } +} + +#[async_trait] +impl InputBuffer for InputBufferStub { + async fn fetch(&mut self, _: bool) -> Vec { + self.fetch_count += 1; + let step = (self.fetch_count / self.fetches_in_step).min(self.steps_until_full); + let msg_count = (MempoolConfig::PAYLOAD_BATCH_BYTES * step) + / self.steps_until_full + / Self::EXTERNAL_MSG_MAX_BYTES; + let mut result = Vec::with_capacity(msg_count); + for _ in 0..msg_count { + let mut data = vec![0; Self::EXTERNAL_MSG_MAX_BYTES]; + thread_rng().fill_bytes(data.as_mut_slice()); + result.push(Bytes::from(data)); + } + result + } +} diff --git a/consensus/src/engine/mempool_config.rs b/consensus/src/engine/mempool_config.rs index 3a37e5d78..6111cf89b 100644 --- a/consensus/src/engine/mempool_config.rs +++ b/consensus/src/engine/mempool_config.rs @@ -36,6 +36,12 @@ impl MempoolConfig { /// or a verifiable point is found (ill-formed or incorrectly signed points are not eligible) pub const DOWNLOAD_PEERS: u8 = 3; + /// hard limit on cached external messages ring buffer + pub const PAYLOAD_BUFFER_BYTES: usize = 50 * 1024 * 1024; + + /// hard limit on point payload (excessive will be postponed) + pub const PAYLOAD_BATCH_BYTES: usize = 768 * 1024; + /// every failed response is accounted as point is not found; /// 1/3+1 failed responses leads to invalidation of the point and all its dependants pub const DOWNLOAD_SPAWN_INTERVAL: Duration = Duration::from_millis(50); diff --git a/consensus/src/engine/mod.rs b/consensus/src/engine/mod.rs index 93b8c357c..fbff95a8f 100644 --- a/consensus/src/engine/mod.rs +++ b/consensus/src/engine/mod.rs @@ -1,5 +1,7 @@ pub use engine::*; +pub use input_buffer::*; pub use mempool_config::*; mod engine; +mod input_buffer; mod mempool_config; diff --git a/consensus/src/intercom/broadcast/broadcast_filter.rs b/consensus/src/intercom/broadcast/broadcast_filter.rs index 55b72f64b..669952140 100644 --- a/consensus/src/intercom/broadcast/broadcast_filter.rs +++ b/consensus/src/intercom/broadcast/broadcast_filter.rs @@ -37,7 +37,7 @@ impl BroadcastFilter { self.0.add(point); } - pub fn advance_round(&self, new_round: &Round) { + pub fn advance_round(&self, new_round: Round) { self.0.advance_round(new_round) } @@ -133,7 +133,7 @@ impl BroadcastFilterInner { } match self.by_round.entry(round).or_try_insert_with(|| { // how many nodes should send broadcasts - NodeCount::try_from(self.peer_schedule.peers_for(&round).len()) + NodeCount::try_from(self.peer_schedule.peers_for(round).len()) .map(|node_count| (node_count, Default::default())) }) { Err(_) => { @@ -154,11 +154,11 @@ impl BroadcastFilterInner { }; } } - self.advance_round(&round); + self.advance_round(round); } // drop everything up to the new round (inclusive), channelling cached points - fn advance_round(&self, new_round: &Round) { + fn advance_round(&self, new_round: Round) { for round in (self.current_dag_round.load(Ordering::Acquire)..=new_round.0).map(Round) { self.output.send(ConsensusEvent::Forward(round)).ok(); // allow filter to channel messages only after Forward was sent @@ -183,7 +183,7 @@ impl BroadcastFilterInner { // TODO there must be some config value - when node needs to sync; // values too far in the future are some garbage, must ban authors self.by_round.retain(|round, _| { - new_round < round && round.0 <= new_round.0 + MempoolConfig::COMMIT_DEPTH as u32 + new_round < *round && round.0 <= new_round.0 + MempoolConfig::COMMIT_DEPTH as u32 }); } } diff --git a/consensus/src/intercom/broadcast/broadcaster.rs b/consensus/src/intercom/broadcast/broadcaster.rs index d06731f52..46d3ad56b 100644 --- a/consensus/src/intercom/broadcast/broadcaster.rs +++ b/consensus/src/intercom/broadcast/broadcaster.rs @@ -101,7 +101,7 @@ impl BroadcasterTask { ) -> Self { let peer_updates = peer_schedule.updates(); let signers = peer_schedule - .peers_for(&point.body.location.round.next()) + .peers_for(point.body.location.round.next()) .iter() .map(|(peer_id, _)| *peer_id) .collect::>(); @@ -113,7 +113,7 @@ impl BroadcasterTask { collectors.len() ); let bcast_request = Dispatcher::broadcast_request(&point); - let sig_request = Dispatcher::signature_request(&point.body.location.round); + let sig_request = Dispatcher::signature_request(point.body.location.round); Self { log_id, dispatcher: dispatcher.clone(), @@ -224,14 +224,14 @@ impl BroadcasterTask { // self.bcast_peers.push(peer_id); // let it retry self.sig_peers.insert(peer_id); // lighter weight retry loop tracing::error!( - "{} @ {:?} bcaster <= collector {peer_id:.4?} broadcast error : {error}", + "{} @ {:?} bcaster => collector {peer_id:.4?} error : {error}", self.log_id, self.current_round ); } Ok(_) => { tracing::debug!( - "{} @ {:?} bcaster <= collector {peer_id:.4?} : broadcast accepted", + "{} @ {:?} bcaster => collector {peer_id:.4?} : broadcast accepted", self.log_id, self.current_round ); @@ -290,9 +290,9 @@ impl BroadcasterTask { } fn broadcast(&mut self, peer_id: &PeerId) { - if self.removed_peers.is_empty() || !self.removed_peers.remove(&peer_id) { + if self.removed_peers.is_empty() || !self.removed_peers.remove(peer_id) { self.bcast_futs - .push(self.dispatcher.send(&peer_id, &self.bcast_request)); + .push(self.dispatcher.send(peer_id, &self.bcast_request)); tracing::debug!( "{} @ {:?} bcaster => collector {peer_id:.4?}: broadcast", self.log_id, @@ -308,9 +308,9 @@ impl BroadcasterTask { } fn request_signature(&mut self, peer_id: &PeerId) { - if self.removed_peers.is_empty() || !self.removed_peers.remove(&peer_id) { + if self.removed_peers.is_empty() || !self.removed_peers.remove(peer_id) { self.sig_futs - .push(self.dispatcher.query(&peer_id, &self.sig_request)); + .push(self.dispatcher.query(peer_id, &self.sig_request)); tracing::debug!( "{} @ {:?} bcaster => collector {peer_id:.4?}: signature request", self.log_id, diff --git a/consensus/src/intercom/broadcast/collector.rs b/consensus/src/intercom/broadcast/collector.rs index c8e46fbe4..0717b62b7 100644 --- a/consensus/src/intercom/broadcast/collector.rs +++ b/consensus/src/intercom/broadcast/collector.rs @@ -83,11 +83,11 @@ impl Collector { assert_eq!( current_dag_round.round(), - &self.next_round, + self.next_round, "collector expected to be run at {:?}", - &self.next_round + self.next_round ); - self.next_round = next_dag_round.round().clone(); + self.next_round = next_dag_round.round(); let includes_ready = FastHashSet::with_capacity_and_hasher( current_dag_round.node_count().full(), Default::default(), @@ -116,8 +116,8 @@ impl Collector { self } - pub fn next_round(&self) -> &'_ Round { - &self.next_round + pub fn next_round(&self) -> Round { + self.next_round } } @@ -193,7 +193,7 @@ impl CollectorTask { }, request = signature_requests.recv() => match request { Some((round, author, callback)) => { - _ = callback.send(self.signature_response(&round, &author)); + _ = callback.send(self.signature_response(round, &author)); } None => panic!("channel with signature requests closed") }, @@ -251,7 +251,7 @@ impl CollectorTask { self.next_includes .push(futures_util::future::ready(state).boxed()); self.is_includes_ready = true; - match point_round.cmp(self.next_dag_round.round()) { + match point_round.cmp(&self.next_dag_round.round()) { Ordering::Less => { panic!("Coding error: next includes futures contain current or previous round") } @@ -279,16 +279,16 @@ impl CollectorTask { ); match consensus_event { ConsensusEvent::Forward(consensus_round) => { - match consensus_round.cmp(self.next_dag_round.round()) { + match consensus_round.cmp(&self.next_dag_round.round()) { // we're too late, consensus moved forward - std::cmp::Ordering::Greater => return Err(consensus_round.clone()), + std::cmp::Ordering::Greater => return Err(*consensus_round), // we still have a chance to finish current round std::cmp::Ordering::Equal => {} // we are among the fastest nodes of consensus std::cmp::Ordering::Less => {} } } - ConsensusEvent::Verified(point) => match &point.body.location.round { + ConsensusEvent::Verified(point) => match point.body.location.round { x if x > self.next_dag_round.round() => { panic!( "{} @ {:?} Coding error: broadcast filter advanced \ @@ -311,7 +311,7 @@ impl CollectorTask { _ => _ = self.current_round.add(&point, &self.downloader), /* maybe other's dependency */ }, ConsensusEvent::Invalid(dag_point) => { - if &dag_point.location().round > self.next_dag_round.round() { + if dag_point.location().round > self.next_dag_round.round() { panic!( "{} @ {:?} Coding error: broadcast filter advanced \ while collector left behind; event: {:?}", @@ -343,7 +343,7 @@ impl CollectorTask { if let Some(Ok(_)) = state.signed() { if let Some(dag_point) = state .point() - .filter(|dp| dp.location().round == *self.current_round.round()) + .filter(|dp| dp.location().round == self.current_round.round()) { self.includes_ready.insert(dag_point.location().author); tracing::debug!( @@ -367,7 +367,7 @@ impl CollectorTask { ); } - fn signature_response(&mut self, round: &Round, author: &PeerId) -> SignatureResponse { + fn signature_response(&mut self, round: Round, author: &PeerId) -> SignatureResponse { if round > self.current_round.round() { return SignatureResponse::TryLater; // hold fast nodes from moving forward }; @@ -385,7 +385,7 @@ impl CollectorTask { // points @ current local dag round are includes for next round point Some(key_pair) if round == self.current_round.round() => Some(key_pair), // points @ previous local dag round are witness for next round point - Some(_) if round == &self.current_round.round().prev() => { + Some(_) if round == self.current_round.round().prev() => { self.current_round.key_pair() } // point is too old, cannot include; @@ -393,12 +393,12 @@ impl CollectorTask { _ => None, }; if signable.sign( - &self.current_round.round(), + self.current_round.round(), key_pair, MempoolConfig::sign_time_range(), ) { if round == self.current_round.round() { - self.includes_ready.insert(author.clone()); + self.includes_ready.insert(*author); } } } diff --git a/consensus/src/intercom/core/dispatcher.rs b/consensus/src/intercom/core/dispatcher.rs index a3a5a45bd..247dfb0e8 100644 --- a/consensus/src/intercom/core/dispatcher.rs +++ b/consensus/src/intercom/core/dispatcher.rs @@ -40,8 +40,8 @@ impl Dispatcher { (&MPQuery::PointById(id.clone())).into() } - pub fn signature_request(round: &Round) -> tycho_network::Request { - (&MPQuery::Signature(round.clone())).into() + pub fn signature_request(round: Round) -> tycho_network::Request { + (&MPQuery::Signature(round)).into() } pub fn query( @@ -52,7 +52,7 @@ impl Dispatcher { where T: TryFrom, { - let peer_id = peer_id.clone(); + let peer_id = *peer_id; let request = request.clone(); let overlay = self.overlay.clone(); let network = self.network.clone(); @@ -80,7 +80,7 @@ impl Dispatcher { peer_id: &PeerId, request: &tycho_network::Request, ) -> BoxFuture<'static, (PeerId, Result<()>)> { - let peer_id = peer_id.clone(); + let peer_id = *peer_id; let request = request.clone(); let overlay = self.overlay.clone(); let network = self.network.clone(); diff --git a/consensus/src/intercom/core/responder.rs b/consensus/src/intercom/core/responder.rs index 072af9ecc..6425f66d4 100644 --- a/consensus/src/intercom/core/responder.rs +++ b/consensus/src/intercom/core/responder.rs @@ -88,7 +88,7 @@ impl ResponderInner { MPQuery::Signature(round) => { let (tx, rx) = oneshot::channel(); self.signature_requests - .send((round, req.metadata.peer_id.clone(), tx)) + .send((round, req.metadata.peer_id, tx)) .ok(); match rx.await { Ok(response) => MPResponse::Signature(response), diff --git a/consensus/src/intercom/dependency/downloader.rs b/consensus/src/intercom/dependency/downloader.rs index b5046ae73..a2dc14098 100644 --- a/consensus/src/intercom/dependency/downloader.rs +++ b/consensus/src/intercom/dependency/downloader.rs @@ -49,13 +49,13 @@ impl Downloader { }; assert_eq!( point_id.location.round, - *point_round_temp.round(), + point_round_temp.round(), "point and DAG round mismatch" ); // request point from its signers (any dependant is among them as point is already verified) let all_peers = self .peer_schedule - .peers_for(&point_round_temp.round().next()) + .peers_for(point_round_temp.round().next()) .iter() .map(|(peer_id, state)| (*peer_id, *state)) .collect::>(); @@ -174,11 +174,10 @@ impl DownloadTask { } fn download_one(&mut self, peer_id: &PeerId) { - let peer_id = peer_id.clone(); self.in_flight.push( self.parent .dispatcher - .query::(&peer_id, &self.request) + .query::(peer_id, &self.request) .boxed(), ); } diff --git a/consensus/src/intercom/dependency/uploader.rs b/consensus/src/intercom/dependency/uploader.rs index 2db9a888b..cfbac10cf 100644 --- a/consensus/src/intercom/dependency/uploader.rs +++ b/consensus/src/intercom/dependency/uploader.rs @@ -50,11 +50,11 @@ impl Uploader { let read = self.top_dag_round.read().await; read.clone() }; - if &point_id.location.round > top_dag_round.round() { + if point_id.location.round > top_dag_round.round() { return None; } top_dag_round - .scan(&point_id.location.round)? + .scan(point_id.location.round)? .view(&point_id.location.author, |loc| { loc.versions().get(&point_id.digest).cloned() })? diff --git a/consensus/src/intercom/peer_schedule/peer_schedule.rs b/consensus/src/intercom/peer_schedule/peer_schedule.rs index e3e3b1315..8eb15c933 100644 --- a/consensus/src/intercom/peer_schedule/peer_schedule.rs +++ b/consensus/src/intercom/peer_schedule/peer_schedule.rs @@ -58,7 +58,7 @@ impl PeerSchedule { // To sign a point or to query for points, we need to know the intersection of: // * which nodes are in the validator set during the round of interest // * which nodes are able to connect at the moment - pub async fn wait_for_peers(&self, round: &Round, node_count: NodeCount) { + pub async fn wait_for_peers(&self, round: Round, node_count: NodeCount) { let mut rx = self.updates(); let peers = (*self.peers_for(round)).clone(); let local_id = self.local_id(); @@ -102,7 +102,7 @@ impl PeerSchedule { /// /// Consensus progress is not guaranteed without witness (because of evidence requirement), /// but we don't care if the consensus of an ending epoch stalls at its last round. - pub fn local_keys(&self, round: &Round) -> Option> { + pub fn local_keys(&self, round: Round) -> Option> { if self.peers_for(round).contains_key(&self.local_id()) { Some(self.local_keys.clone()) } else { @@ -112,7 +112,7 @@ impl PeerSchedule { pub fn all_resolved(&self) -> FastHashSet { let inner = self.inner.lock(); - inner.all_resolved(self.local_id()) + inner.all_resolved(&self.local_id()) } pub fn is_resolved(&self, peer_id: &PeerId) -> bool { @@ -120,7 +120,7 @@ impl PeerSchedule { inner.is_resolved(peer_id) } - pub fn peers_for(&self, round: &Round) -> Arc> { + pub fn peers_for(&self, round: Round) -> Arc> { let inner = self.inner.lock(); inner.peers_for_index_plus_one(inner.index_plus_one(round)) } @@ -130,7 +130,7 @@ impl PeerSchedule { rounds: [Round; N], ) -> [Arc>; N] { let inner = self.inner.lock(); - array::from_fn(|i| inner.peers_for_index_plus_one(inner.index_plus_one(&rounds[i]))) + array::from_fn(|i| inner.peers_for_index_plus_one(inner.index_plus_one(rounds[i]))) } /// does not return empty maps @@ -139,8 +139,8 @@ impl PeerSchedule { return vec![]; } let inner = self.inner.lock(); - let mut first = inner.index_plus_one(&rounds.start); - let last = inner.index_plus_one(&rounds.end.prev()); + let mut first = inner.index_plus_one(rounds.start); + let last = inner.index_plus_one(rounds.end.prev()); if 0 == first && first < last { first += 1; // exclude inner.empty } @@ -193,13 +193,13 @@ impl PeerSchedule { .read_entries() .iter() .filter(|a| a.resolver_handle.is_resolved()) - .map(|a| a.peer_id.clone()) + .map(|a| a.peer_id) .collect::>(); let peers = peers .iter() .map(|peer_id| { ( - peer_id.clone(), + *peer_id, if resolved.contains(&peer_id) && peer_id != local_id { PeerState::Resolved } else { @@ -229,13 +229,13 @@ impl PeerSchedule { continue; }; if *b != new_state { - *b = new_state.clone(); + *b = new_state; is_applied = true; } } } if is_applied { - _ = self.updates.send((peer_id.clone(), new_state)); + _ = self.updates.send((*peer_id, new_state)); } is_applied } @@ -261,12 +261,12 @@ impl PeerScheduleInner { } } - fn index_plus_one(&self, round: &Round) -> u8 { - if self.next_epoch_start.as_ref().map_or(false, |r| r <= round) { + fn index_plus_one(&self, round: Round) -> u8 { + if self.next_epoch_start.map_or(false, |r| r <= round) { 3 - } else if &self.cur_epoch_start <= round { + } else if self.cur_epoch_start <= round { 2 - } else if &self.prev_epoch_start <= round { + } else if self.prev_epoch_start <= round { 1 } else { 0 @@ -281,12 +281,12 @@ impl PeerScheduleInner { } } - fn all_resolved(&self, local_id: PeerId) -> FastHashSet { + fn all_resolved(&self, local_id: &PeerId) -> FastHashSet { self.peers_resolved[0] .iter() .chain(self.peers_resolved[1].iter()) .chain(self.peers_resolved[2].iter()) - .filter(|(peer_id, state)| *state == &PeerState::Resolved && peer_id != &local_id) + .filter(|(peer_id, state)| *state == &PeerState::Resolved && peer_id != local_id) .map(|(peer_id, _)| *peer_id) .collect() } diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 894349114..1014d8cdb 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -5,5 +5,5 @@ pub(crate) mod intercom; pub(crate) mod models; pub mod test_utils; -pub use engine::Engine; +pub use engine::*; pub use models::Point; diff --git a/consensus/src/models/node_count.rs b/consensus/src/models/node_count.rs index 7bedca054..f54e6e8de 100644 --- a/consensus/src/models/node_count.rs +++ b/consensus/src/models/node_count.rs @@ -1,4 +1,4 @@ -#[derive(Clone)] +#[derive(Clone, Copy)] pub struct NodeCount(u8); impl std::fmt::Debug for NodeCount { diff --git a/consensus/src/models/point.rs b/consensus/src/models/point.rs index ce850896f..f4828587a 100644 --- a/consensus/src/models/point.rs +++ b/consensus/src/models/point.rs @@ -204,6 +204,13 @@ pub struct PointBody { pub anchor_proof: Link, } +/// Just a field accessor +#[derive(Clone, Copy)] +pub enum LinkField { + Trigger, + Proof, +} + // Todo: Arc => Point(Arc<...{...}>) #[derive(Clone, Serialize, Deserialize, Debug)] pub struct Point { @@ -289,9 +296,9 @@ impl Point { && self.body.proof.as_ref().map(|p| &p.digest) == self.body.includes.get(&author) // in contrast, evidence must contain only signatures of others && self.body.proof.as_ref().map_or(true, |p| !p.evidence.contains_key(author)) - && self.is_link_well_formed(&self.body.anchor_proof) - && self.is_link_well_formed(&self.body.anchor_trigger) - && match (self.anchor_proof_round(), self.anchor_trigger_round()) { + && self.is_link_well_formed(LinkField::Proof) + && self.is_link_well_formed(LinkField::Trigger) + && match (self.anchor_round(LinkField::Proof), self.anchor_round(LinkField::Trigger)) { (x, MempoolConfig::GENESIS_ROUND) => x >= MempoolConfig::GENESIS_ROUND, (MempoolConfig::GENESIS_ROUND, y) => y >= MempoolConfig::GENESIS_ROUND, // equality is impossible due to commit waves do not start every round; @@ -301,8 +308,8 @@ impl Point { } } - fn is_link_well_formed(&self, link: &Link) -> bool { - match link { + fn is_link_well_formed(&self, link_field: LinkField) -> bool { + match self.anchor_link(link_field) { Link::ToSelf => true, Link::Direct(Through::Includes(peer)) => self.body.includes.contains_key(peer), Link::Direct(Through::Witness(peer)) => self.body.witness.contains_key(peer), @@ -323,66 +330,47 @@ impl Point { } } - // TODO maybe implement field accessors parameterized by combination of enums - - pub fn anchor_trigger_round(&self) -> Round { - self.get_linked_to_round(&self.body.anchor_trigger) - } - - pub fn anchor_proof_round(&self) -> Round { - self.get_linked_to_round(&self.body.anchor_proof) - } - - pub fn anchor_trigger_id(&self) -> PointId { - self.get_linked_to(&self.body.anchor_trigger) - } - - pub fn anchor_proof_id(&self) -> PointId { - self.get_linked_to(&self.body.anchor_proof) - } - - pub fn anchor_trigger_through(&self) -> PointId { - self.get_linked_through(&self.body.anchor_trigger) - } - - pub fn anchor_proof_through(&self) -> PointId { - self.get_linked_through(&self.body.anchor_proof) + pub fn anchor_link(&self, link_field: LinkField) -> &'_ Link { + match link_field { + LinkField::Trigger => &self.body.anchor_trigger, + LinkField::Proof => &self.body.anchor_proof, + } } - fn get_linked_to_round(&self, link: &Link) -> Round { - match link { - Link::ToSelf => self.body.location.round.clone(), + pub fn anchor_round(&self, link_field: LinkField) -> Round { + match self.anchor_link(link_field) { + Link::ToSelf => self.body.location.round, Link::Direct(Through::Includes(_)) => self.body.location.round.prev(), Link::Direct(Through::Witness(_)) => self.body.location.round.prev().prev(), - Link::Indirect { to, .. } => to.location.round.clone(), + Link::Indirect { to, .. } => to.location.round, } } - fn get_linked_to(&self, link: &Link) -> PointId { - match link { - Link::ToSelf => self.id(), - Link::Direct(Through::Includes(peer)) => self.get_linked(peer, true), - Link::Direct(Through::Witness(peer)) => self.get_linked(peer, false), + /// the final destination of an anchor link + pub fn anchor_id(&self, link_field: LinkField) -> PointId { + match self.anchor_link(link_field) { Link::Indirect { to, .. } => to.clone(), + _direct => self.anchor_link_id(link_field), } } - fn get_linked_through(&self, link: &Link) -> PointId { - match link { + /// next point in path from `&self` to the anchor + pub fn anchor_link_id(&self, link_field: LinkField) -> PointId { + let (peer, is_in_includes) = match self.anchor_link(link_field) { + Link::ToSelf => return self.id(), + Link::Direct(Through::Includes(peer)) => (peer, true), + Link::Direct(Through::Witness(peer)) => (peer, false), Link::Indirect { path: Through::Includes(peer), .. - } => self.get_linked(peer, true), + } => (peer, true), Link::Indirect { path: Through::Witness(peer), .. - } => self.get_linked(peer, false), - _ => self.get_linked_to(link), - } - } + } => (peer, false), + }; - fn get_linked(&self, peer: &PeerId, through_includes: bool) -> PointId { - let (through, round) = if through_includes { + let (map, round) = if is_in_includes { (&self.body.includes, self.body.location.round.prev()) } else { (&self.body.witness, self.body.location.round.prev().prev()) @@ -390,9 +378,9 @@ impl Point { PointId { location: Location { round, - author: peer.clone(), + author: *peer, }, - digest: through + digest: map .get(peer) .expect("Coding error: usage of ill-formed point") .clone(), diff --git a/consensus/src/test_utils.rs b/consensus/src/test_utils.rs index 04ed55e5f..485574359 100644 --- a/consensus/src/test_utils.rs +++ b/consensus/src/test_utils.rs @@ -103,7 +103,7 @@ mod tests { use tokio::sync::mpsc; use super::*; - use crate::engine::Engine; + use crate::engine::{Engine, InputBufferStub}; #[global_allocator] static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc; @@ -182,6 +182,7 @@ mod tests { &dht_client, &overlay_service, committed_tx.clone(), + InputBufferStub::new(100, 3), ); engine.init_with_genesis(all_peers.as_slice()).await; tracing::info!("created engine {}", dht_client.network().peer_id());