Skip to content

Commit

Permalink
refactor(consensus): test utils
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Jun 16, 2024
1 parent 09cc24b commit 8f3792b
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 227 deletions.
14 changes: 13 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ tikv-jemallocator = { version = "0.5", features = [
] }
tl-proto = "0.4"
tokio = { version = "1", default-features = false }
tokio-stream = "0.1.15"
tokio-util = { version = "0.7.10", features = ["codec"] }
tower = "0.4"
tower-http = "0.5"
Expand Down
1 change: 1 addition & 0 deletions consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ rand_pcg = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
tokio = { workspace = true, default-features = false }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tracing = { workspace = true }
weedb = { workspace = true }
Expand Down
6 changes: 4 additions & 2 deletions consensus/examples/consensus_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{fmt, EnvFilter, Layer};
use tycho_consensus::test_utils::drain_anchors;
use tycho_consensus::test_utils::AnchorConsumer;
use tycho_consensus::{Engine, InputBufferStub};
use tycho_network::{Address, DhtConfig, NetworkConfig, PeerId, PeerInfo};
use tycho_util::time::now_sec;
Expand Down Expand Up @@ -140,7 +140,9 @@ impl CmdRun {
InputBufferStub::new(100, 5),
);
engine.init_with_genesis(all_peers.as_slice()).await;
tokio::spawn(drain_anchors(committed_rx));
let mut anchor_consumer = AnchorConsumer::default();
anchor_consumer.add(local_id, committed_rx);
tokio::spawn(anchor_consumer.drain());

tracing::info!(
local_id = %dht_client.network().peer_id(),
Expand Down
33 changes: 16 additions & 17 deletions consensus/examples/engine/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ use std::time::Duration;
use clap::Parser;
use everscale_crypto::ed25519::{KeyPair, SecretKey};
use parking_lot::deadlock;
use tokio::sync::{mpsc, Mutex};
use tokio::sync::mpsc;
use tycho_consensus::test_utils::*;
use tycho_consensus::{Engine, InputBufferStub};
use tycho_network::{Address, DhtConfig, NetworkConfig, PeerId};
use tycho_util::FastHashMap;

mod logger;

Expand Down Expand Up @@ -94,9 +93,7 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
.map(|((_, key_pair), addr)| Arc::new(make_peer_info(key_pair, vec![addr.clone()], None)))
.collect::<Vec<_>>();

let anchors_map = Arc::new(Mutex::new(FastHashMap::default()));
let refs_map = Arc::new(Mutex::new(FastHashMap::default()));

let mut anchor_consumer = AnchorConsumer::default();
let mut handles = vec![];
for (((secret_key, key_pair), bind_address), peer_id) in keys
.into_iter()
Expand All @@ -105,8 +102,8 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
{
let all_peers = all_peers.clone();
let peer_info = peer_info.clone();
let anchors_map = anchors_map.clone();
let refs_map = refs_map.clone();
let (committed_tx, committed_rx) = mpsc::unbounded_channel();
anchor_consumer.add(peer_id, committed_rx);

let handle = std::thread::Builder::new()
.name(format!("engine-{peer_id:.4}"))
Expand Down Expand Up @@ -143,16 +140,6 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
.expect("add peer to dht client");
}
}

let (committed_tx, committed_rx) = mpsc::unbounded_channel();
tokio::spawn(check_anchors(
committed_rx,
peer_id,
anchors_map,
refs_map,
cli.nodes,
));
// tokio::spawn(drain_anchors(committed_rx));
let mut engine = Engine::new(
key_pair,
&dht_client,
Expand All @@ -168,6 +155,18 @@ fn make_network(cli: Cli) -> Vec<std::thread::JoinHandle<()>> {
.unwrap();
handles.push(handle);
}
handles.push(
std::thread::Builder::new()
.name("anchor-consumer".to_string())
.spawn(move || {
tokio::runtime::Builder::new_current_thread()
.worker_threads(1)
.build()
.expect("new tokio runtime")
.block_on(anchor_consumer.check())
})
.unwrap(),
);
handles
}

Expand Down
207 changes: 0 additions & 207 deletions consensus/src/test_utils.rs

This file was deleted.

Loading

0 comments on commit 8f3792b

Please sign in to comment.