Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/broadcaster'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 18, 2024
2 parents f501688 + 274607c commit a0726c5
Show file tree
Hide file tree
Showing 40 changed files with 1,811 additions and 1,305 deletions.
16 changes: 15 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ tikv-jemallocator = { version = "0.5", features = [
] }
tl-proto = "0.4"
tokio = { version = "1", default-features = false }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["codec"] }
tower = "0.4"
tower-http = "0.5"
Expand Down
29 changes: 15 additions & 14 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ pub trait MempoolAdapter: Send + Sync + 'static {
pub struct MempoolAdapterStdImpl {
// TODO: replace with rocksdb
anchors: Arc<RwLock<IndexMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
externals_tx: tokio::sync::mpsc::UnboundedSender<Bytes>,
externals_tx: mpsc::UnboundedSender<Bytes>,

anchor_added: Arc<tokio::sync::Notify>,
anchor_added: Arc<Notify>,
}

impl MempoolAdapterStdImpl {
Expand All @@ -95,8 +95,7 @@ impl MempoolAdapterStdImpl {
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter...");
let anchors = Arc::new(RwLock::new(IndexMap::new()));

let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<(Arc<Point>, Vec<Arc<Point>>)>();
let (sender, receiver) = mpsc::unbounded_channel();

// TODO receive from outside
let (externals_tx, externals_rx) = mpsc::unbounded_channel();
Expand Down Expand Up @@ -150,18 +149,19 @@ impl MempoolAdapterFactory for Arc<MempoolAdapterStdImpl> {

pub async fn handle_anchors(
adapter: Arc<MempoolAdapterStdImpl>,
mut rx: UnboundedReceiver<(Arc<Point>, Vec<Arc<Point>>)>,
mut rx: UnboundedReceiver<(Point, Vec<Point>)>,
) {
let mut cache = ExternalMessageCache::new(1000);
while let Some((anchor, points)) = rx.recv().await {
let anchor_id: MempoolAnchorId = anchor.body().location.round.0;
let mut messages = Vec::new();
let mut total_messages = 0;
let mut total_bytes = 0;
let mut messages_bytes = 0;

for point in points.iter() {
total_messages += point.body.payload.len();
'message: for message in &point.body.payload {
total_messages += point.body().payload.len();
'message: for message in &point.body().payload {
total_bytes += message.len();
let cell = match Boc::decode(message) {
Ok(cell) => cell,
Expand Down Expand Up @@ -191,15 +191,14 @@ pub async fn handle_anchors(
}
};

if cache.check_unique(anchor.body.location.round.0, cell.repr_hash()) {
if cache.check_unique(anchor_id, cell.repr_hash()) {
messages.push(Arc::new(ExternalMessage::new(cell.clone(), ext_in_message)));
messages_bytes += message.len();
}
}
}
cache.clean(anchor.body.location.round.0);

metrics::gauge!("tycho_mempool_last_anchor_round").set(anchor.body.location.round.0);
metrics::gauge!("tycho_mempool_last_anchor_round").set(anchor.body().location.round.0);
metrics::counter!("tycho_mempool_externals_count_total").increment(messages.len() as _);
metrics::counter!("tycho_mempool_externals_bytes_total").increment(messages_bytes as _);
metrics::counter!("tycho_mempool_duplicates_count_total")
Expand All @@ -209,20 +208,22 @@ pub async fn handle_anchors(

tracing::info!(
target: tracing_targets::MEMPOOL_ADAPTER,
round = anchor.body.location.round.0,
time = anchor.body.time.as_u64(),
round = anchor_id,
time = anchor.body().time.as_u64(),
externals_unique = messages.len(),
externals_skipped = total_messages - messages.len(),
"new anchor"
);

let anchor = Arc::new(MempoolAnchor::new(
anchor.body.location.round.0,
anchor.body.time.as_u64(),
anchor_id,
anchor.body().time.as_u64(),
messages,
));

adapter.add_anchor(anchor);

cache.clean(anchor_id);
}
}

Expand Down
10 changes: 9 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ license.workspace = true
[[example]]
name = "consensus-node"
path = "examples/consensus_node.rs"
required-features = ["test"]

[[example]]
name = "engine"
path = "examples/engine/main.rs"
required-features = ["test"]

[dependencies]
ahash = { workspace = true }
Expand All @@ -29,11 +31,13 @@ futures-util = { workspace = true }
itertools = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
rand = { workspace = true }
rand_pcg = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
tokio = { workspace = true, default-features = false }
tokio-stream = { workspace = true, optional = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
weedb = { workspace = true }

Expand All @@ -43,6 +47,7 @@ tycho-storage = { workspace = true }
tycho-util = { workspace = true }

[dev-dependencies]
humantime = { workspace = true }
parking_lot = { workspace = true, features = ["deadlock_detection"] }
tikv-jemallocator = { workspace = true, features = [
"unprefixed_malloc_on_supported_platforms",
Expand All @@ -62,3 +67,6 @@ tycho-util = { workspace = true, features = ["test"] }

[lints]
workspace = true

[features]
test = ["dep:tokio-stream"]
6 changes: 4 additions & 2 deletions consensus/examples/consensus_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter, Layer};
use tycho_consensus::test_utils::drain_anchors;
use tycho_consensus::test_utils::AnchorConsumer;
use tycho_consensus::{Engine, InputBufferStub};
use tycho_network::{Address, DhtConfig, NetworkConfig, PeerId, PeerInfo};
use tycho_util::time::now_sec;
Expand Down Expand Up @@ -140,7 +140,9 @@ impl CmdRun {
InputBufferStub::new(100, 5),
);
engine.init_with_genesis(all_peers.as_slice()).await;
tokio::spawn(drain_anchors(committed_rx));
let mut anchor_consumer = AnchorConsumer::default();
anchor_consumer.add(local_id, committed_rx);
tokio::spawn(anchor_consumer.drain());

tracing::info!(
local_id = %dht_client.network().peer_id(),
Expand Down
40 changes: 19 additions & 21 deletions consensus/examples/engine/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,18 @@ use std::time::Duration;
use clap::Parser;
use everscale_crypto::ed25519::{KeyPair, SecretKey};
use parking_lot::deadlock;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::mpsc;
use tycho_consensus::test_utils::*;
use tycho_consensus::{Engine, InputBufferStub};
use tycho_network::{Address, DhtConfig, NetworkConfig, PeerId};
use tycho_util::FastHashMap;

mod logger;

#[global_allocator]
static ALLOC: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Cli::parse().run().await
fn main() -> anyhow::Result<()> {
Cli::parse().run()
}

/// Tycho network node.
Expand Down Expand Up @@ -47,7 +45,7 @@ struct Cli {
}

impl Cli {
async fn run(self) -> anyhow::Result<()> {
fn run(self) -> anyhow::Result<()> {
let fun = if self.flame {
logger::flame
} else {
Expand Down Expand Up @@ -95,9 +93,7 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
.map(|((_, key_pair), addr)| Arc::new(make_peer_info(key_pair, vec![addr.clone()], None)))
.collect::<Vec<_>>();

let anchors_map = Arc::new(Mutex::new(FastHashMap::default()));
let refs_map = Arc::new(Mutex::new(FastHashMap::default()));

let mut anchor_consumer = AnchorConsumer::default();
let mut handles = vec![];
for (((secret_key, key_pair), bind_address), peer_id) in keys
.into_iter()
Expand All @@ -106,8 +102,8 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
{
let all_peers = all_peers.clone();
let peer_info = peer_info.clone();
let anchors_map = anchors_map.clone();
let refs_map = refs_map.clone();
let (committed_tx, committed_rx) = mpsc::unbounded_channel();
anchor_consumer.add(peer_id, committed_rx);

let handle = std::thread::Builder::new()
.name(format!("engine-{peer_id:.4}"))
Expand Down Expand Up @@ -144,16 +140,6 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
.expect("add peer to dht client");
}
}

let (committed_tx, committed_rx) = mpsc::unbounded_channel();
tokio::spawn(check_anchors(
committed_rx,
peer_id,
anchors_map,
refs_map,
cli.nodes,
));
// tokio::spawn(drain_anchors(committed_rx));
let mut engine = Engine::new(
key_pair,
&dht_client,
Expand All @@ -169,6 +155,18 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
.unwrap();
handles.push(handle);
}
handles.push(
std::thread::Builder::new()
.name("anchor-consumer".to_string())
.spawn(move || {
tokio::runtime::Builder::new_current_thread()
.worker_threads(1)
.build()
.expect("new tokio runtime")
.block_on(anchor_consumer.check())
})
.unwrap(),
);
handles
}

Expand Down
6 changes: 2 additions & 4 deletions consensus/src/dag/anchor_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@ impl AnchorStage {
// reproducible global coin
let leader_index = rand_pcg::Pcg32::seed_from_u64(anchor_candidate_round as u64)
.gen_range(0..leader_peers.len());
let Some(leader) = leader_peers
let leader = leader_peers
.iter()
.nth(leader_index)
.map(|(peer_id, _)| peer_id)
else {
panic!("selecting a leader from an empty validator set")
};
.expect("selecting a leader from an empty validator set");
// the leader cannot produce three points in a row, so we have an undefined leader,
// rather than an intentional leaderless support round - all represented by `None`
if !current_peers.contains_key(leader) {
Expand Down
Loading

0 comments on commit a0726c5

Please sign in to comment.