Skip to content

Commit

Permalink
fix(consensus): changes from debug session
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed May 10, 2024
1 parent 67023a3 commit e1283b1
Show file tree
Hide file tree
Showing 17 changed files with 440 additions and 339 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ tycho-util = { workspace = true, features = ["test"] }
parking_lot = { workspace = true, features = ["deadlock_detection"] }
tokio = { workspace = true, default-features = false, features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
tikv-jemallocator = { workspace = true, features = [
"unprefixed_malloc_on_supported_platforms",
"background_threads",
]}

[lints]
workspace = true
13 changes: 8 additions & 5 deletions consensus/examples/consensus_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::sync::Arc;
use anyhow::Result;
use clap::{Parser, Subcommand};
use everscale_crypto::ed25519;
use everscale_crypto::ed25519::KeyPair;
use everscale_crypto::ed25519::{KeyPair, PublicKey};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing_subscriber::layer::SubscriberExt;
Expand Down Expand Up @@ -119,15 +119,18 @@ impl CmdRun {
.map(|info| info.id)
.collect::<Vec<_>>();

let mut initial_peer_count = 0usize;
let mut initial_peer_count = 1_usize;
let local_id = PeerId::from(PublicKey::from(&secret_key));
for peer in global_config.bootstrap_peers {
let is_new = dht_client.add_peer(Arc::new(peer))?;
initial_peer_count += is_new as usize;
if peer.id != local_id {
let is_new = dht_client.add_peer(Arc::new(peer))?;
initial_peer_count += is_new as usize;
}
}

let (committed_tx, committed_rx) = mpsc::unbounded_channel();
let engine = Engine::new(key_pair.clone(), &dht_client, &overlay, committed_tx).await;
drain_anchors(committed_rx);
tokio::spawn(drain_anchors(committed_rx));

tracing::info!(
local_id = %dht_client.network().peer_id(),
Expand Down
21 changes: 3 additions & 18 deletions consensus/src/dag/dag_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,7 @@ pub struct DagLocation {
}

impl DagLocation {
pub fn insert_own_point(&mut self, my_point: &DagPoint) {
let old = self.versions.insert(
my_point.digest().clone(),
Shared::new(JoinTask::new(futures_util::future::ready(my_point.clone()))),
);
assert!(
old.is_none(),
"Coding error: own point is already inserted into DAG location"
);
self.state.insert_own_point(my_point);
}
pub fn add_dependency<I, F>(&mut self, digest: &Digest, init: I) -> Shared<JoinTask<DagPoint>>
pub fn get_or_init<I, F>(&mut self, digest: &Digest, init: I) -> Shared<JoinTask<DagPoint>>
where
I: FnOnce() -> F,
F: Future<Output = DagPoint> + Send + 'static,
Expand All @@ -62,11 +51,7 @@ impl DagLocation {
}
}
}
pub fn add_validate<I, F>(
&mut self,
digest: &Digest,
init: I,
) -> Option<&'_ Shared<JoinTask<DagPoint>>>
pub fn init<I, F>(&mut self, digest: &Digest, init: I) -> Option<&'_ Shared<JoinTask<DagPoint>>>
where
I: FnOnce() -> F,
F: Future<Output = DagPoint> + Send + 'static,
Expand Down Expand Up @@ -194,7 +179,7 @@ impl Signable {
this_call_signed = true;
Ok(Signed {
at: at.clone(),
with: valid.point.body.sign(key_pair),
with: Signature::new(key_pair, &valid.point.digest),
})
});
} else if &valid.point.body.time < time_range.start() {
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/dag_round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl DagRound {

pub async fn valid_point_exact(&self, node: &PeerId, digest: &Digest) -> Option<ValidPoint> {
let point_fut = self.view(node, |loc| loc.versions().get(digest).cloned())??;
point_fut.await.0.valid().cloned()
point_fut.await.0.into_valid()
}

pub fn add(
Expand All @@ -179,7 +179,7 @@ impl DagRound {
let state = loc.state().clone();
let point = point.clone();
let downloader = downloader.clone();
loc.add_validate(digest, || Verifier::validate(point, dag_round, downloader))
loc.init(digest, || Verifier::validate(point, dag_round, downloader))
.map(|first| first.clone().map(|_| state).boxed())
})
}
Expand Down Expand Up @@ -230,7 +230,7 @@ impl DagRound {
}
self.edit(&dag_point.location().author, |loc| {
let state = loc.state().clone();
loc.add_validate(dag_point.digest(), || {
loc.init(dag_point.digest(), || {
futures_util::future::ready(dag_point.clone())
})
.map(|first| first.clone().map(|_| state).boxed())
Expand Down
30 changes: 14 additions & 16 deletions consensus/src/dag/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,20 @@ impl Producer {
.into_iter()
.map(|point| (point.body.location.author, point.digest.clone()))
.collect::<BTreeMap<_, _>>();
Some(Arc::new(
PointBody {
location: Location {
round: current_round.round().clone(),
author: local_id.clone(),
},
time,
payload,
proof: prev_point.cloned(),
includes,
witness,
anchor_trigger,
anchor_proof,
}
.wrap(&key_pair),
))

Some(Point::new(key_pair, PointBody {
location: Location {
round: current_round.round().clone(),
author: local_id.clone(),
},
time,
payload,
proof: prev_point.cloned(),
includes,
witness,
anchor_trigger,
anchor_proof,
}))
}

fn includes(finished_round: &DagRound) -> Vec<Arc<Point>> {
Expand Down
17 changes: 2 additions & 15 deletions consensus/src/dag/verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Verifier {
) {
let downloader = downloader.clone();
let shared = round.edit(author, |loc| {
loc.add_dependency(digest, move || {
loc.get_or_init(digest, move || {
let point_id = PointId {
location: Location {
author: author.clone(),
Expand Down Expand Up @@ -385,21 +385,8 @@ impl Verifier {
if point.body.time < proven.body.time {
return false; // time must be non-decreasing by the same author
}
let Some(body) = bincode::serialize(&proven.body).ok() else {
// should be removed after move to TL
panic!("Library error: failed to serialize proven point body")
};
for (peer, sig) in proof.evidence.iter() {
let Some(pubkey) = peer.as_public_key() else {
// should have been validated prior validator elections
panic!("Config error: failed to convert peer id into public key")
};
let sig: Result<[u8; 64], _> = sig.0.to_vec().try_into();
let Some(sig) = sig.ok() else {
// unexpected bytes used as a signature, thus invalid
return false;
};
if !pubkey.verify_raw(body.as_slice(), &sig) {
if !sig.verifies(peer, &proof.digest) {
return false;
}
}
Expand Down
Loading

0 comments on commit e1283b1

Please sign in to comment.