Skip to content

Commit

Permalink
feat(consensus): init engine outside of constructor
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed May 10, 2024
1 parent e1283b1 commit e32c0cf
Show file tree
Hide file tree
Showing 13 changed files with 157 additions and 98 deletions.
7 changes: 4 additions & 3 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ impl MempoolAdapterStdImpl {
tokio::sync::mpsc::unbounded_channel::<(Arc<Point>, Vec<Arc<Point>>)>();

tokio::spawn(async move {
let engine =
tycho_consensus::Engine::new(key_pair, &dht_client, &overlay_service, sender).await;

let mut engine =
tycho_consensus::Engine::new(key_pair, &dht_client, &overlay_service, sender);
// TODO replace with some sensible init before run
engine.init_with_genesis(&[]).await;
engine.run().await;
});

Expand Down
7 changes: 4 additions & 3 deletions consensus/examples/consensus_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ impl CmdRun {
}

let (committed_tx, committed_rx) = mpsc::unbounded_channel();
let engine = Engine::new(key_pair.clone(), &dht_client, &overlay, committed_tx).await;
let mut engine = Engine::new(key_pair.clone(), &dht_client, &overlay, committed_tx);
engine.init_with_genesis(all_peers.as_slice()).await;
tokio::spawn(drain_anchors(committed_rx));

tracing::info!(
Expand Down Expand Up @@ -187,9 +188,9 @@ struct CmdGenDht {
impl CmdGenDht {
fn run(self) -> Result<()> {
let secret_key = parse_key(&self.key)?;
let key_pair = Arc::new(KeyPair::from(&secret_key));
let key_pair = KeyPair::from(&secret_key);
let entry =
tycho_consensus::test_utils::make_peer_info(key_pair, self.addr.into(), self.ttl);
tycho_consensus::test_utils::make_peer_info(&key_pair, self.addr.into(), self.ttl);
let output = if std::io::stdin().is_terminal() {
serde_json::to_string_pretty(&entry)
} else {
Expand Down
14 changes: 9 additions & 5 deletions consensus/src/dag/dag.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,22 @@ pub struct Dag {
}

impl Dag {
pub fn new(dag_round: DagRound) -> Self {
let mut rounds = BTreeMap::new();
rounds.insert(dag_round.round().clone(), dag_round);
pub fn new() -> Self {
Self {
rounds: Arc::new(Mutex::new(rounds)),
rounds: Arc::new(Mutex::new(BTreeMap::new())),
}
}

pub fn init(&self, dag_round: DagRound) {
let mut rounds = self.rounds.lock();
assert!(rounds.is_empty(), "DAG already initialized");
rounds.insert(dag_round.round().clone(), dag_round);
}

pub fn top(&self, round: &Round, peer_schedule: &PeerSchedule) -> DagRound {
let mut rounds = self.rounds.lock();
let mut top = match rounds.last_key_value() {
None => unreachable!("DAG cannot be empty"),
None => unreachable!("DAG cannot be empty if properly initialized?"),
Some((_, top)) => top.clone(),
};
if (top.round().0 + MempoolConfig::COMMIT_DEPTH as u32) < round.0 {
Expand Down
16 changes: 14 additions & 2 deletions consensus/src/dag/dag_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,18 @@ impl WeakDagRound {
}

impl DagRound {
/// stub that must remain unlinked into DAG chain and only to be replaced
pub fn unusable() -> Self {
Self(Arc::new(DagRoundInner {
round: Round::BOTTOM,
node_count: NodeCount::GENESIS,
key_pair: None,
anchor_stage: None,
locations: FastDashMap::default(),
prev: WeakDagRound::BOTTOM,
}))
}

pub fn new(round: Round, peer_schedule: &PeerSchedule, prev: WeakDagRound) -> Self {
let peers = peer_schedule.peers_for(&round);
let locations = FastDashMap::with_capacity_and_hasher(peers.len(), Default::default());
Expand Down Expand Up @@ -195,8 +207,8 @@ impl DagRound {
panic!("Coding error: malformed point")
}
let point = Verifier::validate(point.clone(), self.clone(), downloader.clone()).await;
if point.valid().is_none() {
panic!("Coding error: not a valid point")
if point.trusted().is_none() {
panic!("Coding error: not a trusted point")
}
let state = self.insert_exact(&point)?.await;
if let Some(signable) = state.signable() {
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ impl Verifier {
/// blame author and every dependent point's author
fn is_proof_ok(
point: &Point, // @ r+0
proven: &Point, // @ r-1
proven: &Point, // @ r-1
) -> bool {
if point.body.location.author != proven.body.location.author {
panic!("Coding error: mismatched authors of proof and its vertex")
Expand Down
86 changes: 48 additions & 38 deletions consensus/src/engine/engine.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use everscale_crypto::ed25519::{KeyPair, SecretKey};
use everscale_crypto::ed25519::KeyPair;
use itertools::Itertools;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::{mpsc, oneshot, RwLock};
Expand All @@ -19,6 +19,7 @@ pub struct Engine {
log_id: Arc<String>,
dag: Dag,
peer_schedule: Arc<PeerSchedule>,
peer_schedule_updater: PeerScheduleUpdater,
dispatcher: Dispatcher,
downloader: Downloader,
broadcaster: Broadcaster,
Expand All @@ -30,7 +31,7 @@ pub struct Engine {
}

impl Engine {
pub async fn new(
pub fn new(
key_pair: Arc<KeyPair>,
dht_client: &DhtClient,
overlay_service: &OverlayService,
Expand All @@ -51,7 +52,6 @@ impl Engine {
let dispatcher = Dispatcher::new(
&dht_client,
&overlay_service,
&[], // TODO: FIX PEERS
Responder::new(
log_id.clone(),
broadcast_filter.clone(),
Expand All @@ -61,38 +61,21 @@ impl Engine {
);
let broadcaster = Broadcaster::new(log_id.clone(), &dispatcher);

let genesis = crate::test_utils::genesis();
// check only genesis round as it is widely used in point validation.
// if some nodes use distinct genesis data, their first points will be rejected
assert_eq!(
genesis.body.location.round,
MempoolConfig::GENESIS_ROUND,
"genesis point round must match genesis round from config"
);
let peer_schedule_updater =
PeerScheduleUpdater::new(dispatcher.overlay.clone(), peer_schedule.clone());
// finished epoch
peer_schedule.set_next_start(genesis.body.location.round);
peer_schedule_updater.set_next_peers(&vec![genesis.body.location.author]);
peer_schedule.rotate();
// current epoch
peer_schedule.set_next_start(genesis.body.location.round.next());
// start updater only after peers are populated into schedule
peer_schedule_updater.set_next_peers(&[]); // TODO: FIX PEERS
peer_schedule.rotate();

let current_dag_round = DagRound::genesis(&genesis, &peer_schedule);
let dag = Dag::new(current_dag_round.clone());

let top_dag_round = Arc::new(RwLock::new(current_dag_round.clone()));
let top_dag_round = Arc::new(RwLock::new(DagRound::unusable()));

let mut tasks = JoinSet::new();
let uploader = Uploader::new(log_id.clone(), uploader_rx, top_dag_round.clone());
tasks.spawn(async move {
uploader.run().await;
});
tasks.spawn(async move {
peer_schedule_updater.run().await;
tasks.spawn({
let peer_schedule_updater = peer_schedule_updater.clone();
async move {
peer_schedule_updater.run().await;
}
});
tasks.spawn({
let broadcast_filter = broadcast_filter.clone();
Expand All @@ -103,22 +86,13 @@ impl Engine {

let downloader = Downloader::new(log_id.clone(), &dispatcher, &peer_schedule);

let genesis_state = current_dag_round
.insert_exact_sign(&genesis, &peer_schedule, &downloader)
.await;
let collector = Collector::new(
log_id.clone(),
&downloader,
bcast_rx,
sig_responses,
genesis_state.into_iter(),
current_dag_round.round().next(),
);
let collector = Collector::new(log_id.clone(), &downloader, bcast_rx, sig_responses);

Self {
log_id,
dag,
dag: Dag::new(),
peer_schedule,
peer_schedule_updater,
dispatcher,
downloader,
broadcaster,
Expand All @@ -130,6 +104,42 @@ impl Engine {
}
}

pub async fn init_with_genesis(&mut self, next_peers: &[PeerId]) {
let genesis = crate::test_utils::genesis();
assert!(
genesis.body.location.round > *self.top_dag_round.read().await.round(),
"genesis point round is too low"
);
// check only genesis round as it is widely used in point validation.
// if some nodes use distinct genesis data, their first points will be rejected
assert_eq!(
genesis.body.location.round,
MempoolConfig::GENESIS_ROUND,
"genesis point round must match genesis round from config"
);
// finished epoch
self.peer_schedule
.set_next_start(genesis.body.location.round);
self.peer_schedule_updater
.set_next_peers(&vec![genesis.body.location.author], false);
self.peer_schedule.rotate();
// current epoch
self.peer_schedule
.set_next_start(genesis.body.location.round.next());
// start updater only after peers are populated into schedule
self.peer_schedule_updater.set_next_peers(next_peers, true);
self.peer_schedule.rotate();

let current_dag_round = DagRound::genesis(&genesis, &self.peer_schedule);
self.dag.init(current_dag_round.clone());

let genesis_state = current_dag_round
.insert_exact_sign(&genesis, &self.peer_schedule, &self.downloader)
.await;
self.collector
.init(current_dag_round.round().next(), genesis_state.into_iter());
}

pub async fn run(mut self) -> ! {
let mut prev_point: Option<Arc<PrevPoint>> = None;
let mut produce_own_point = true;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/intercom/broadcast/broadcast_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl BroadcastFilter {
log_id,
last_by_peer: Default::default(),
by_round: Default::default(),
current_dag_round: Default::default(), // will advance with other peers
current_dag_round: AtomicU32::new(Round::BOTTOM.0), // will advance with other peers
peer_schedule,
output,
}))
Expand Down
14 changes: 8 additions & 6 deletions consensus/src/intercom/broadcast/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,23 @@ impl Collector {
downloader: &Downloader,
from_bcast_filter: mpsc::UnboundedReceiver<ConsensusEvent>,
signature_requests: mpsc::UnboundedReceiver<SignatureRequest>,
next_includes: impl Iterator<Item = InclusionState>,
next_round: Round,
) -> Self {
Self {
log_id,
downloader: downloader.clone(),
from_bcast_filter,
signature_requests,
next_round,
next_includes: FuturesUnordered::from_iter(
next_includes.map(|a| futures_util::future::ready(a).boxed()),
),
next_round: Round::BOTTOM,
next_includes: FuturesUnordered::new(),
}
}

pub fn init(&mut self, next_round: Round, next_includes: impl Iterator<Item = InclusionState>) {
self.next_round = next_round;
self.next_includes
.extend(next_includes.map(|a| futures_util::future::ready(a).boxed()));
}

pub async fn run(
mut self,
next_dag_round: DagRound, // r+1
Expand Down
2 changes: 0 additions & 2 deletions consensus/src/intercom/core/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,13 @@ impl Dispatcher {
pub fn new(
dht_client: &DhtClient,
overlay_service: &OverlayService,
all_peers: &[PeerId],
responder: Responder,
) -> Self {
let dht_service = dht_client.service();
let peer_resolver = dht_service.make_peer_resolver().build(dht_client.network());

let private_overlay = PrivateOverlay::builder(Self::PRIVATE_OVERLAY_ID)
.with_peer_resolver(peer_resolver)
.with_entries(all_peers)
.build(responder);

overlay_service.add_private_overlay(&private_overlay);
Expand Down
Loading

0 comments on commit e32c0cf

Please sign in to comment.