Skip to content

Commit

Permalink
feat(consensus): cli
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed May 2, 2024
1 parent e6abee9 commit 3c8b9ce
Show file tree
Hide file tree
Showing 13 changed files with 409 additions and 117 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pkcs8 = "0.10"
quick_cache = "0.4.1"
quinn = { version = "0.10", default-features = false, features = ["runtime-tokio", "tls-rustls"] }
rand = "0.8"
rand_pcg = { version = "0.3" }
rcgen = "0.11"
ring = "0.16"
rlimit = "0.10.1"
Expand Down
40 changes: 18 additions & 22 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ pub trait MempoolAdapterFactory {
}

impl<F, R> MempoolAdapterFactory for F
where
F: Fn(Arc<dyn MempoolEventListener>) -> R,
R: MempoolAdapter,
where
F: Fn(Arc<dyn MempoolEventListener>) -> R,
R: MempoolAdapter,
{
type Adapter = R;

Expand Down Expand Up @@ -74,7 +74,6 @@ pub trait MempoolAdapter: Send + Sync + 'static {
async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()>;
}


pub struct MempoolAdapterImpl {
//TODO: replace with rocksdb
anchors: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
Expand All @@ -90,11 +89,17 @@ impl MempoolAdapterImpl {
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter...");
let anchors = Arc::new(RwLock::new(BTreeMap::new()));

let (sender, receiver) = tokio::sync::mpsc::unbounded_channel::<(Arc<Point>, Vec<Arc<Point>>)>();
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<(Arc<Point>, Vec<Arc<Point>>)>();

let engine =
tycho_consensus::Engine::new(&secret_key, &dht_client, &overlay_service, &peers, sender)
.await;
let engine = tycho_consensus::Engine::new(
&secret_key,
&dht_client,
&overlay_service,
&peers,
sender,
)
.await;

tokio::spawn(async move { engine.run() });

Expand Down Expand Up @@ -147,9 +152,8 @@ pub async fn parse_points(
}
};

let external_message = ExternalMessage::new(cell.clone(), ext_in_message );
let external_message = ExternalMessage::new(cell.clone(), ext_in_message);
external_messages.insert(*cell.repr_hash(), external_message);

}
}

Expand All @@ -161,7 +165,7 @@ pub async fn parse_points(
let anchor = Arc::new(MempoolAnchor::new(
anchor.body.location.round.0,
anchor.body.time.as_u64(),
messages
messages,
));

adapter.add_anchor(anchor);
Expand All @@ -170,11 +174,7 @@ pub async fn parse_points(

#[async_trait]
impl MempoolAdapter for MempoolAdapterImpl {

async fn enqueue_process_new_mc_block_state(
&self,
mc_state: ShardStateStuff,
) -> Result<()> {
async fn enqueue_process_new_mc_block_state(&self, mc_state: ShardStateStuff) -> Result<()> {
//TODO: make real implementation, currently does nothing
tracing::info!(
target: tracing_targets::MEMPOOL_ADAPTER,
Expand Down Expand Up @@ -222,9 +222,7 @@ impl MempoolAdapter for MempoolAdapterImpl {
let mut request_timer = std::time::Instant::now();
loop {
{
let anchors_cache_r = self
.anchors
.read();
let anchors_cache_r = self.anchors.read();

let mut range = anchors_cache_r.range((
std::ops::Bound::Excluded(prev_anchor_id),
Expand Down Expand Up @@ -268,9 +266,7 @@ impl MempoolAdapter for MempoolAdapterImpl {
}

async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
let mut anchors_cache_rw = self
.anchors
.write();
let mut anchors_cache_rw = self.anchors.write();

anchors_cache_rw.retain(|anchor_id, _| anchor_id >= &before_anchor_id);
Ok(())
Expand Down
15 changes: 9 additions & 6 deletions collator/src/mempool/mempool_adapter_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::{
use anyhow::{anyhow, Result};
use async_trait::async_trait;

use crate::mempool::{MempoolAdapter, MempoolEventListener};
use everscale_types::{
cell::{CellBuilder, CellSliceRange, HashBytes},
models::{ExtInMsgInfo, IntAddr, MsgInfo, OwnedMessage, StdAddr},
};
use rand::Rng;
use tycho_block_util::state::ShardStateStuff;
use crate::mempool::{MempoolAdapter, MempoolEventListener};

use crate::tracing_targets;

Expand Down Expand Up @@ -201,12 +201,15 @@ fn _stub_create_random_anchor_with_stub_externals(
msg_cell_builder.store_u32(i as u32).unwrap();
let msg_cell = msg_cell_builder.build().unwrap();
let msg_cell_range = CellSliceRange::full(&*msg_cell);
let msg = ExternalMessage::new(msg_cell, ExtInMsgInfo {
dst: IntAddr::Std(StdAddr::new(0, rand_addr)),
..Default::default()
});
let msg = ExternalMessage::new(
msg_cell,
ExtInMsgInfo {
dst: IntAddr::Std(StdAddr::new(0, rand_addr)),
..Default::default()
},
);
externals.push(Arc::new(msg));
}

Arc::new(MempoolAnchor::new(anchor_id, chain_time, externals))
}
}
2 changes: 1 addition & 1 deletion collator/src/mempool/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod mempool_adapter;
mod types;
mod mempool_adapter_stub;
mod types;

pub use mempool_adapter::*;
pub(crate) use types::{MempoolAnchor, MempoolAnchorId};
2 changes: 1 addition & 1 deletion collator/src/mempool/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub type MempoolAnchorId = u32;

pub(crate) struct ExternalMessage {
message_cell: Cell,
message_info: ExtInMsgInfo
message_info: ExtInMsgInfo,
}

impl ExternalMessage {
Expand Down
12 changes: 11 additions & 1 deletion consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ rust-version.workspace = true
repository.workspace = true
license.workspace = true

[[example]]
name = "consensus-node"
path = "examples/consensus_node.rs"

[dependencies]
ahash = { workspace = true }
anyhow = { workspace = true }
Expand All @@ -19,14 +23,20 @@ futures-util = { workspace = true }
itertools = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true, features = ["small_rng"] }
rand_pcg = { workspace = true }
serde = { workspace = true, features = ["derive"] }
sha2 = { workspace = true }
tokio = { workspace = true, default-features = false }
tracing = { workspace = true }
weedb = { workspace = true }

# examples' dependencies
clap = { workspace = true }
hex = { workspace = true }
serde_json = { workspace = true }
tracing-appender = { workspace = true }

# local deps
rand_pcg = { version = "0.3" }
tycho-network = { workspace = true }
tycho-storage = { workspace = true }
tycho-util = { workspace = true, features = ["test"] }
Expand Down
Loading

0 comments on commit 3c8b9ce

Please sign in to comment.