Skip to content

Commit

Permalink
fix(mempool-adapter): fix mempool-adapter merge problem, apply rust f…
Browse files Browse the repository at this point in the history
…mt to code. Slightly rework mempool adapter factory
  • Loading branch information
MrWad3r committed May 10, 2024
1 parent 37cfe08 commit 2bc79c4
Show file tree
Hide file tree
Showing 14 changed files with 277 additions and 224 deletions.
227 changes: 116 additions & 111 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ tycho-network = { workspace = true }
tycho-storage = { workspace = true }
tycho-util = { workspace = true }

[dev-dependencies]
tycho-collator = { workspace = true, features = ["test"] }

[build-dependencies]
anyhow = { workspace = true }
rustc_version = { workspace = true }
Expand Down
9 changes: 6 additions & 3 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use futures_util::future::BoxFuture;
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_collator::collator::CollatorStdImplFactory;
use tycho_collator::manager::CollationManager;
use tycho_collator::mempool::MempoolAdapterStdImpl;
use tycho_collator::mempool::MempoolAdapterfactoryStd;
use tycho_collator::msg_queue::MessageQueueAdapterStdImpl;
use tycho_collator::state_node::{StateNodeAdapter, StateNodeAdapterStdImpl};
use tycho_collator::types::{CollationConfig, ValidatorNetwork};
Expand Down Expand Up @@ -486,14 +486,17 @@ impl Node {
supported_block_version: 50,
supported_capabilities: supported_capabilities(),
max_collate_threads: 1,
test_validators_keypairs: vec![],
};

let collation_manager = CollationManager::start(
collation_config,
Arc::new(MessageQueueAdapterStdImpl::default()),
|listener| StateNodeAdapterStdImpl::new(listener, self.storage.clone()),
MempoolAdapterStdImpl::new,
MempoolAdapterfactoryStd::new(
self.keypair.clone(),
self.dht_client.clone(),
self.overlay_service.clone(),
),
ValidatorStdImplFactory {
network: ValidatorNetwork {
overlay_service: self.overlay_service.clone(),
Expand Down
2 changes: 1 addition & 1 deletion collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ where
Arc::new(state_node_adapter_factory.create(arc_dispatcher.clone()));

// create mempool adapter
let mpool_adapter = Arc::new(mpool_adapter_factory.create(arc_dispatcher.clone()));
let mpool_adapter = mpool_adapter_factory.create(arc_dispatcher.clone());

// create validator and start its tasks queue
let validator = validator_factory.create(ValidatorContext {
Expand Down
219 changes: 133 additions & 86 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
use std::collections::BTreeMap;
use std::sync::{Arc, RwLock};
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;

use anyhow::{anyhow, Result};
use anyhow::Result;
use async_trait::async_trait;
use everscale_types::cell::{CellBuilder, CellSliceRange, HashBytes};
use everscale_types::models::{ExtInMsgInfo, IntAddr, MsgInfo, OwnedMessage, StdAddr};
use rand::Rng;
use everscale_crypto::ed25519::KeyPair;
use everscale_types::boc::Boc;
use everscale_types::cell::HashBytes;
use everscale_types::models::ExtInMsgInfo;
use everscale_types::prelude::Load;
use parking_lot::RwLock;
use tokio::sync::mpsc::UnboundedReceiver;
use tycho_block_util::state::ShardStateStuff;
use tycho_consensus::Point;
use tycho_network::{DhtClient, OverlayService};

use super::types::{ExternalMessage, MempoolAnchor, MempoolAnchorId};
use crate::mempool::types::ExternalMessage;
use crate::mempool::{MempoolAnchor, MempoolAnchorId};
use crate::tracing_targets;

#[cfg(test)]
Expand All @@ -20,7 +27,7 @@ pub(super) mod tests;
pub trait MempoolAdapterFactory {
type Adapter: MempoolAdapter;

fn create(&self, listener: Arc<dyn MempoolEventListener>) -> Self::Adapter;
fn create(&self, listener: Arc<dyn MempoolEventListener>) -> Arc<Self::Adapter>;
}

impl<F, R> MempoolAdapterFactory for F
Expand All @@ -30,8 +37,8 @@ where
{
type Adapter = R;

fn create(&self, listener: Arc<dyn MempoolEventListener>) -> Self::Adapter {
self(listener)
fn create(&self, listener: Arc<dyn MempoolEventListener>) -> Arc<Self::Adapter> {
Arc::new(self(listener))
}
}

Expand Down Expand Up @@ -71,57 +78,128 @@ pub trait MempoolAdapter: Send + Sync + 'static {
async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()>;
}

pub struct MempoolAdapterStdImpl {
listener: Arc<dyn MempoolEventListener>,
pub struct MempoolAdapterfactoryStd {
key_pair: Arc<KeyPair>,
dht_client: DhtClient,
overlay_service: OverlayService,
}

impl MempoolAdapterfactoryStd {
pub fn new(
key_pair: Arc<KeyPair>,
dht_client: DhtClient,
overlay_service: OverlayService,
) -> MempoolAdapterfactoryStd {
Self {
key_pair,
dht_client,
overlay_service,
}
}
}

impl MempoolAdapterFactory for MempoolAdapterfactoryStd {
type Adapter = MempoolAdapterStdImpl;

fn create(&self, _: Arc<dyn MempoolEventListener>) -> Arc<Self::Adapter> {
MempoolAdapterStdImpl::new(
self.key_pair.clone(),
self.dht_client.clone(),
self.overlay_service.clone(),
)
}
}

_stub_anchors_cache: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
pub struct MempoolAdapterStdImpl {
// TODO: replace with rocksdb
anchors: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
}

impl MempoolAdapterStdImpl {
pub fn new(listener: Arc<dyn MempoolEventListener>) -> Self {
pub fn new(
key_pair: Arc<KeyPair>,
dht_client: DhtClient,
overlay_service: OverlayService,
) -> Arc<Self> {
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter...");
let anchors = Arc::new(RwLock::new(BTreeMap::new()));

// TODO: make real implementation, currently runs stub task
// that produces the repeating set of anchors
let stub_anchors_cache = Arc::new(RwLock::new(BTreeMap::new()));

tokio::spawn({
let listener = listener.clone();
let stub_anchors_cache = stub_anchors_cache.clone();
async move {
let mut anchor_id = 0;
loop {
let rnd_round_interval = rand::thread_rng().gen_range(400..600);
tokio::time::sleep(tokio::time::Duration::from_millis(rnd_round_interval * 6))
.await;
anchor_id += 1;
let anchor = _stub_create_random_anchor_with_stub_externals(anchor_id);
{
let mut anchor_cache_rw = stub_anchors_cache
.write()
.map_err(|e| anyhow!("Poison error on write lock: {:?}", e))
.unwrap();
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
"Random anchor (id: {}, chain_time: {}, externals: {}) added to cache",
anchor.id(),
anchor.chain_time(),
anchor.externals_count(),
);
anchor_cache_rw.insert(anchor_id, anchor.clone());
}
listener.on_new_anchor(anchor).await.unwrap();
}
}
let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<(Arc<Point>, Vec<Arc<Point>>)>();

tokio::spawn(async move {
let engine =
tycho_consensus::Engine::new(key_pair, &dht_client, &overlay_service, sender).await;

engine.run().await;
});
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Stub anchors generator started");

tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool adapter created");

Self {
listener,
_stub_anchors_cache: stub_anchors_cache,
let mempool_adapter = Arc::new(Self { anchors });

// start handling mempool anchors
tokio::spawn(parse_points(mempool_adapter.clone(), receiver));

mempool_adapter
}

fn add_anchor(&self, anchor: Arc<MempoolAnchor>) {
let mut guard = self.anchors.write();
guard.insert(anchor.id(), anchor);
}
}

pub async fn parse_points(
adapter: Arc<MempoolAdapterStdImpl>,
mut rx: UnboundedReceiver<(Arc<Point>, Vec<Arc<Point>>)>,
) {
while let Some((anchor, points)) = rx.recv().await {
let mut external_messages = HashMap::<HashBytes, ExternalMessage>::new();

for point in points {
'message: for message in &point.body.payload {
let cell = match Boc::decode(message) {
Ok(cell) => cell,
Err(e) => {
tracing::error!(target: tracing_targets::MEMPOOL_ADAPTER, "Failed to deserialize bytes into cell. Error: {e:?}"); // TODO: should handle errors properly?
continue 'message;
}
};

let mut slice = match cell.as_slice() {
Ok(slice) => slice,
Err(e) => {
tracing::error!(target: tracing_targets::MEMPOOL_ADAPTER, "Failed to make slice from cell. Error: {e:?}");
continue 'message;
}
};

let ext_in_message = match ExtInMsgInfo::load_from(&mut slice) {
Ok(message) => message,
Err(e) => {
tracing::error!(target: tracing_targets::MEMPOOL_ADAPTER, "Bad cell. Failed to deserialize to ExtInMsgInfo. Err: {e:?}");
continue 'message;
}
};

let external_message = ExternalMessage::new(cell.clone(), ext_in_message);
external_messages.insert(*cell.repr_hash(), external_message);
}
}

let messages = external_messages
.into_iter()
.map(|m| Arc::new(m.1))
.collect::<Vec<_>>();

let anchor = Arc::new(MempoolAnchor::new(
anchor.body.location.round.0,
anchor.body.time.as_u64(),
messages,
));

adapter.add_anchor(anchor);
}
}

Expand All @@ -143,10 +221,8 @@ impl MempoolAdapter for MempoolAdapterStdImpl {
) -> Result<Option<Arc<MempoolAnchor>>> {
// TODO: make real implementation, currently only return anchor from local cache
let res = {
let anchors_cache_r = self
._stub_anchors_cache
.read()
.map_err(|e| anyhow!("Poison error on read lock: {:?}", e))?;
let anchors_cache_r = self.anchors.read();

anchors_cache_r.get(&anchor_id).cloned()
};
if res.is_some() {
Expand Down Expand Up @@ -177,10 +253,7 @@ impl MempoolAdapter for MempoolAdapterStdImpl {
let mut request_timer = std::time::Instant::now();
loop {
{
let anchors_cache_r = self
._stub_anchors_cache
.read()
.map_err(|e| anyhow!("Poison error on read lock: {:?}", e))?;
let anchors_cache_r = self.anchors.read();

let mut range = anchors_cache_r.range((
std::ops::Bound::Excluded(prev_anchor_id),
Expand Down Expand Up @@ -224,35 +297,9 @@ impl MempoolAdapter for MempoolAdapterStdImpl {
}

async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
let mut anchors_cache_rw = self
._stub_anchors_cache
.write()
.map_err(|e| anyhow!("Poison error on write lock: {:?}", e))?;
let mut anchors_cache_rw = self.anchors.write();

anchors_cache_rw.retain(|anchor_id, _| anchor_id >= &before_anchor_id);
Ok(())
}
}

fn _stub_create_random_anchor_with_stub_externals(
anchor_id: MempoolAnchorId,
) -> Arc<MempoolAnchor> {
let chain_time = anchor_id as u64 * 471 * 6 % 1000000000;
let externals_count = chain_time as i32 % 10;
let mut externals = vec![];
for i in 0..externals_count {
let rand_addr = (0..32).map(|_| rand::random::<u8>()).collect::<Vec<u8>>();
let rand_addr = HashBytes::from_slice(&rand_addr);
let mut msg_cell_builder = CellBuilder::new();
msg_cell_builder.store_u32(anchor_id).unwrap();
msg_cell_builder.store_u64(chain_time).unwrap();
msg_cell_builder.store_u32(i as u32).unwrap();
let msg_cell = msg_cell_builder.build().unwrap();
let msg = ExternalMessage::new(msg_cell, ExtInMsgInfo {
dst: IntAddr::Std(StdAddr::new(0, rand_addr)),
..Default::default()
});
externals.push(Arc::new(msg));
}

Arc::new(MempoolAnchor::new(anchor_id, chain_time, externals))
}
7 changes: 3 additions & 4 deletions collator/src/mempool/mempool_adapter_stub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock};

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use everscale_types::cell::{CellBuilder, CellSliceRange, HashBytes};
use everscale_types::models::{ExtInMsgInfo, IntAddr, MsgInfo, OwnedMessage, StdAddr};
use everscale_types::cell::{CellBuilder, HashBytes};
use everscale_types::models::{ExtInMsgInfo, IntAddr, StdAddr};
use rand::Rng;
use tycho_block_util::state::ShardStateStuff;

use super::types::{ExternalMessage, MempoolAnchor, MempoolAnchorId};
use crate::mempool::{MempoolAdapter, MempoolEventListener};
use crate::mempool::mempool_adapter::{MempoolAdapter, MempoolEventListener};
use crate::tracing_targets;

#[cfg(test)]
Expand Down Expand Up @@ -194,7 +194,6 @@ fn _stub_create_random_anchor_with_stub_externals(
msg_cell_builder.store_u64(chain_time).unwrap();
msg_cell_builder.store_u32(i as u32).unwrap();
let msg_cell = msg_cell_builder.build().unwrap();
let msg_cell_range = CellSliceRange::full(&*msg_cell);
let msg = ExternalMessage::new(msg_cell, ExtInMsgInfo {
dst: IntAddr::Std(StdAddr::new(0, rand_addr)),
..Default::default()
Expand Down
1 change: 1 addition & 0 deletions collator/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ mod mempool_adapter_stub;
mod types;

pub use mempool_adapter::*;
pub use mempool_adapter_stub::MempoolAdapterStubImpl;
pub(crate) use types::{MempoolAnchor, MempoolAnchorId};
4 changes: 2 additions & 2 deletions collator/src/mempool/tests/mempool_adapter_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Result;
use async_trait::async_trait;

use super::{MempoolAdapter, MempoolEventListener};
use crate::mempool::{MempoolAdapterStdImpl, MempoolAnchor};
use crate::mempool::{MempoolAdapterStdImpl, MempoolAdapterStubImpl, MempoolAnchor};
use crate::test_utils::try_init_test_tracing;

struct MempoolEventStubListener;
Expand All @@ -25,7 +25,7 @@ impl MempoolEventListener for MempoolEventStubListener {
async fn test_stub_anchors_generator() -> Result<()> {
try_init_test_tracing(tracing_subscriber::filter::LevelFilter::TRACE);

let adapter = MempoolAdapterStdImpl::new(Arc::new(MempoolEventStubListener {}));
let adapter = MempoolAdapterStubImpl::new(Arc::new(MempoolEventStubListener {}));

// try get not existing anchor by id
let opt_anchor = adapter.get_anchor_by_id(10).await?;
Expand Down
4 changes: 2 additions & 2 deletions collator/src/mempool/types.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use everscale_types::models::{ExtInMsgInfo, OwnedMessage};
use everscale_types::models::ExtInMsgInfo;
use everscale_types::prelude::Cell;

// TYPES
Expand All @@ -21,7 +21,7 @@ impl ExternalMessage {
}
}

pub(crate) struct MempoolAnchor {
pub struct MempoolAnchor {
id: MempoolAnchorId,
chain_time: u64,
externals: Vec<Arc<ExternalMessage>>,
Expand Down
Loading

0 comments on commit 2bc79c4

Please sign in to comment.