Skip to content

Commit

Permalink
feature(consensus): more rayon
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Jul 16, 2024
1 parent 0012906 commit f6bfa23
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 14 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
rand_pcg = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
tokio = { workspace = true, default-features = false }
Expand Down
14 changes: 7 additions & 7 deletions consensus/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tokio::task::{JoinError, JoinHandle};
use tracing::Instrument;
use tycho_network::{DhtClient, OverlayService, PeerId};
use tycho_util::metrics::HistogramGuard;
use tycho_util::sync::rayon_run;

use crate::dag::{Dag, DagRound, LastOwnPoint, Producer, Verifier, WeakDagRound};
use crate::effects::{
Expand Down Expand Up @@ -205,7 +206,7 @@ impl Engine {
let input_buffer = self.input_buffer.clone();
let last_own_point = last_own_point.clone();
futures_util::future::Either::Right(
tokio::task::spawn_blocking(move || {
rayon_run(move || {
let task_start_time = Instant::now();
Producer::new_point(
&current_dag_round,
Expand All @@ -225,7 +226,7 @@ impl Engine {
)
} else {
drop(own_point_state_tx);
futures_util::future::Either::Left(futures_util::future::ready(Ok(None::<Point>)))
futures_util::future::Either::Left(futures_util::future::ready(None::<Point>))
};

let bcaster_run = tokio::spawn({
Expand All @@ -235,7 +236,7 @@ impl Engine {
let mut broadcaster = self.broadcaster;
let downloader = self.downloader.clone();
async move {
let own_point = own_point_fut.await.expect("new point producer");
let own_point = own_point_fut.await;
round_effects.own_point(own_point.as_ref());

if let Some(own_point) = own_point {
Expand Down Expand Up @@ -266,7 +267,7 @@ impl Engine {
}
});

let commit_run = tokio::task::spawn_blocking({
let commit_run = rayon_run({
let mut dag = self.dag;
let next_dag_round = next_dag_round.clone();
let committed_tx = self.committed.clone();
Expand Down Expand Up @@ -319,7 +320,7 @@ impl Engine {
);

match tokio::join!(collector_run, bcaster_run, commit_run) {
(Ok((collector, next_round)), Ok((bcaster, new_last_own_point)), Ok(dag)) => {
(Ok((collector, next_round)), Ok((bcaster, new_last_own_point)), dag) => {
self.broadcaster = bcaster;
// do not reset to None, Producer decides whether to use old value or not
if let Some(new_last_own_point) = new_last_own_point {
Expand All @@ -330,11 +331,10 @@ impl Engine {
self.collector = collector;
self.dag = dag;
}
(collector, bcaster, commit) => {
(collector, bcaster, _) => {
let msg = Self::join_err_msg(&[
(collector.err(), "collector"),
(bcaster.err(), "broadcaster"),
(commit.err(), "commit"),
]);
let _span = round_effects.span().enter();
panic!("{msg}")
Expand Down
15 changes: 8 additions & 7 deletions consensus/src/models/point.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;

use bytes::Bytes;
use everscale_crypto::ed25519::KeyPair;
use rayon::prelude::{IntoParallelRefIterator, ParallelIterator};
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use sha2::{Digest as Sha2Digest, Sha256};
use tycho_network::PeerId;
Expand Down Expand Up @@ -170,12 +171,12 @@ pub struct PrevPoint {
}
impl PrevPoint {
pub fn signatures_match(&self) -> bool {
for (peer, sig) in &self.evidence {
if !sig.verifies(peer, &self.digest) {
return false;
}
}
true
// according to the rule of thumb to yield every 0.01-0.1 ms,
// and that each signature check takes near 0.03 ms,
// every check deserves its own async task - delegate to rayon
self.evidence
.par_iter()
.all(|(peer, sig)| sig.verifies(peer, &self.digest))
}
}

Expand Down Expand Up @@ -570,7 +571,7 @@ mod tests {
humantime::format_duration(elapsed_start)
);
println!(
"check {PEERS} with rayon took {}",
"check {PEERS} sigs on rayon took {}",
humantime::format_duration(elapsed_run)
);
}
Expand Down

0 comments on commit f6bfa23

Please sign in to comment.