From db0a4d2b943dcc4a37baf23663a018f1c3f4150a Mon Sep 17 00:00:00 2001 From: serejkaaa512 <5125402@mail.ru> Date: Wed, 10 Jul 2024 11:18:47 +0300 Subject: [PATCH] feat(collator): refactor mempool adapter stubs --- .../mempool_adapter_ext_from_files_stub.rs | 83 ++++++++---- collator/src/mempool/mempool_adapter_stub.rs | 118 +++++++++++------- 2 files changed, 130 insertions(+), 71 deletions(-) diff --git a/collator/src/mempool/mempool_adapter_ext_from_files_stub.rs b/collator/src/mempool/mempool_adapter_ext_from_files_stub.rs index c34b62e6c..e818437a1 100644 --- a/collator/src/mempool/mempool_adapter_ext_from_files_stub.rs +++ b/collator/src/mempool/mempool_adapter_ext_from_files_stub.rs @@ -1,6 +1,3 @@ -use std::collections::BTreeMap; -use std::fs::File; -use std::io::Read; use std::path::PathBuf; use std::sync::Arc; use std::{fs, io}; @@ -9,33 +6,52 @@ use anyhow::Result; use async_trait::async_trait; use base64::prelude::BASE64_STANDARD; use base64::Engine; +use bytes::Bytes; +use everscale_crypto::ed25519::KeyPair; use everscale_types::boc::Boc; use everscale_types::cell::Load; use everscale_types::models::{MsgInfo, OwnedMessage}; +use indexmap::IndexMap; use parking_lot::RwLock; use rand::Rng; +use tokio::sync::Notify; use tycho_block_util::state::ShardStateStuff; -use tycho_network::PeerId; +use tycho_network::{DhtClient, OverlayService, PeerId}; use super::mempool_adapter_stub::{stub_get_anchor_by_id, stub_get_next_anchor}; use super::types::{ExternalMessage, MempoolAnchor, MempoolAnchorId}; use crate::mempool::mempool_adapter::{MempoolAdapter, MempoolEventListener}; +use crate::mempool::MempoolAdapterFactory; use crate::tracing_targets; // FACTORY +#[derive(Clone)] pub struct MempoolAdapterExtFilesStubImpl { - _listener: Arc, + pub anchors: Arc>>>, - _stub_anchors_cache: Arc>>>, + anchor_added: Arc, } impl MempoolAdapterExtFilesStubImpl { - pub fn new(listener: Arc) -> Self { + pub fn new() -> Arc { + let anchors = Arc::new(RwLock::new(IndexMap::new())); + tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter..."); - let stub_anchors_cache = Arc::new(RwLock::new(BTreeMap::new())); + Arc::new(Self { + anchors, + anchor_added: Arc::new(Notify::new()), + }) + } + pub fn run( + self: &Arc, + _key_pair: Arc, + _dht_client: DhtClient, + _overlay_service: OverlayService, + _peers: Vec, + ) { let externals_dir = PathBuf::from("test/externals/set02"); let mut externals = fs::read_dir(externals_dir) .expect("externals dir not found") @@ -44,10 +60,11 @@ impl MempoolAdapterExtFilesStubImpl { .expect("failed to read externals dir"); externals.sort(); + tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool adapter created"); + let stub_anchors_cache = self.anchors.clone(); + let adapter = self.clone(); tokio::spawn({ - let listener = listener.clone(); - let stub_anchors_cache = stub_anchors_cache.clone(); async move { let mut anchor_id = 0; let mut externals_iter = externals.into_iter(); @@ -57,7 +74,7 @@ impl MempoolAdapterExtFilesStubImpl { tokio::time::sleep(tokio::time::Duration::from_millis(rnd_round_interval * 6)) .await; anchor_id += 1; - if let Some(external) = externals_iter.next() { + let anchor = if let Some(external) = externals_iter.next() { let anchor = create_anchor_with_externals_from_file(anchor_id, external); { let mut anchor_cache_rw = stub_anchors_cache.write(); @@ -71,7 +88,7 @@ impl MempoolAdapterExtFilesStubImpl { anchor_cache_rw.insert(anchor_id, anchor.clone()); } last_chain_time = anchor.chain_time(); - listener.on_new_anchor(anchor).await.unwrap(); + anchor } else { let anchor = create_empty_anchor(anchor_id, last_chain_time); { @@ -86,19 +103,31 @@ impl MempoolAdapterExtFilesStubImpl { anchor_cache_rw.insert(anchor_id, anchor.clone()); } last_chain_time = anchor.chain_time(); - listener.on_new_anchor(anchor).await.unwrap(); - } + anchor + }; + adapter.add_anchor(anchor); } } }); - tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Anchors generator with externals from files started"); + } - tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool adapter created"); + pub fn send_external(&self, _message: Bytes) {} - Self { - _listener: listener, - _stub_anchors_cache: stub_anchors_cache, + fn add_anchor(&self, anchor: Arc) { + { + let mut guard = self.anchors.write(); + guard.insert(anchor.id(), anchor); } + + self.anchor_added.notify_waiters(); + } +} + +impl MempoolAdapterFactory for Arc { + type Adapter = MempoolAdapterExtFilesStubImpl; + + fn create(&self, _listener: Arc) -> Arc { + self.clone() } } @@ -118,15 +147,21 @@ impl MempoolAdapter for MempoolAdapterExtFilesStubImpl { &self, anchor_id: MempoolAnchorId, ) -> Result>> { - stub_get_anchor_by_id(self._stub_anchors_cache.clone(), anchor_id).await + stub_get_anchor_by_id(self.anchors.clone(), anchor_id).await } async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result> { - stub_get_next_anchor(self._stub_anchors_cache.clone(), prev_anchor_id).await + stub_get_next_anchor( + self.anchors.clone(), + prev_anchor_id, + self.anchor_added.clone(), + ) + .await } async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> { - let mut anchors_cache_rw = self._stub_anchors_cache.write(); + let mut anchors_cache_rw = self.anchors.write(); + anchors_cache_rw.retain(|anchor_id, _| anchor_id >= &before_anchor_id); Ok(()) } @@ -137,9 +172,7 @@ pub fn create_anchor_with_externals_from_file( external_path: PathBuf, ) -> Arc { let mut externals = vec![]; - let mut file = File::open(&external_path).unwrap(); - let mut buf = vec![]; - file.read_to_end(&mut buf).unwrap(); + let buf = fs::read(&external_path).unwrap(); let file_name = external_path.file_name().unwrap().to_str().unwrap(); tracing::info!("read external from file: {}", file_name); let timestamp: u64 = file_name.parse().unwrap(); diff --git a/collator/src/mempool/mempool_adapter_stub.rs b/collator/src/mempool/mempool_adapter_stub.rs index eceb02306..87d6205ad 100644 --- a/collator/src/mempool/mempool_adapter_stub.rs +++ b/collator/src/mempool/mempool_adapter_stub.rs @@ -1,17 +1,20 @@ -use std::collections::BTreeMap; use std::sync::Arc; use anyhow::Result; use async_trait::async_trait; +use bytes::Bytes; +use everscale_crypto::ed25519::KeyPair; use everscale_types::cell::{CellBuilder, HashBytes}; use everscale_types::models::{ExtInMsgInfo, IntAddr, StdAddr}; +use indexmap::IndexMap; use parking_lot::RwLock; use rand::Rng; +use tokio::sync::Notify; use tycho_block_util::state::ShardStateStuff; -use tycho_network::PeerId; +use tycho_network::{DhtClient, OverlayService, PeerId}; use super::types::{ExternalMessage, MempoolAnchor, MempoolAnchorId}; -use crate::mempool::mempool_adapter::{MempoolAdapter, MempoolEventListener}; +use crate::mempool::mempool_adapter::MempoolAdapter; use crate::tracing_targets; #[cfg(test)] @@ -21,20 +24,33 @@ pub(super) mod tests; // FACTORY pub struct MempoolAdapterStubImpl { - _listener: Arc, + anchors: Arc>>>, - _stub_anchors_cache: Arc>>>, + anchor_added: Arc, } impl MempoolAdapterStubImpl { - pub fn new(listener: Arc) -> Self { - tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter..."); + pub fn new() -> Arc { + let anchors = Arc::new(RwLock::new(IndexMap::new())); - let stub_anchors_cache = Arc::new(RwLock::new(BTreeMap::new())); + Arc::new(Self { + anchors, + anchor_added: Arc::new(Notify::new()), + }) + } + pub fn run( + self: &Arc, + _key_pair: Arc, + _dht_client: DhtClient, + _overlay_service: OverlayService, + _peers: Vec, + ) { + tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter..."); + + let stub_anchors_cache = self.anchors.clone(); + let adapter = self.clone(); tokio::spawn({ - let listener = listener.clone(); - let stub_anchors_cache = stub_anchors_cache.clone(); async move { let mut anchor_id = 0; loop { @@ -54,18 +70,20 @@ impl MempoolAdapterStubImpl { anchor.externals_count(), ); } - listener.on_new_anchor(anchor).await.unwrap(); + adapter.add_anchor(anchor); } } }); - tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Stub anchors generator started"); - - tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool adapter created"); + } + pub fn send_external(&self, _message: Bytes) {} - Self { - _listener: listener, - _stub_anchors_cache: stub_anchors_cache, + fn add_anchor(&self, anchor: Arc) { + { + let mut guard = self.anchors.write(); + guard.insert(anchor.id(), anchor); } + + self.anchor_added.notify_waiters(); } } @@ -85,22 +103,27 @@ impl MempoolAdapter for MempoolAdapterStubImpl { &self, anchor_id: MempoolAnchorId, ) -> Result>> { - stub_get_anchor_by_id(self._stub_anchors_cache.clone(), anchor_id).await + stub_get_anchor_by_id(self.anchors.clone(), anchor_id).await } async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result> { - stub_get_next_anchor(self._stub_anchors_cache.clone(), prev_anchor_id).await + stub_get_next_anchor( + self.anchors.clone(), + prev_anchor_id, + self.anchor_added.clone(), + ) + .await } async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> { - let mut anchors_cache_rw = self._stub_anchors_cache.write(); + let mut anchors_cache_rw = self.anchors.write(); anchors_cache_rw.retain(|anchor_id, _| anchor_id >= &before_anchor_id); Ok(()) } } pub(super) async fn stub_get_anchor_by_id( - anchors_cache: Arc>>>, + anchors_cache: Arc>>>, anchor_id: MempoolAnchorId, ) -> Result>> { let mut first_attempt = true; @@ -146,39 +169,42 @@ pub(super) async fn stub_get_anchor_by_id( } pub(super) async fn stub_get_next_anchor( - anchors_cache: Arc>>>, + anchors_cache: Arc>>>, prev_anchor_id: MempoolAnchorId, + anchor_added: Arc, ) -> Result> { let mut stub_first_attempt = true; let mut request_timer = std::time::Instant::now(); loop { + let anchor_added = anchor_added.notified(); { let anchors_cache_r = anchors_cache.read(); - let mut range = anchors_cache_r.range(( - std::ops::Bound::Excluded(prev_anchor_id), - std::ops::Bound::Unbounded, - )); - - if let Some((next_id, next)) = range.next() { - if stub_first_attempt { - tracing::debug!( - target: tracing_targets::MEMPOOL_ADAPTER, - "Found in cache next anchor (id: {}) after specified previous (id: {})", - next_id, - prev_anchor_id, - ); - } else { - tracing::debug!( - target: tracing_targets::MEMPOOL_ADAPTER, - "STUB: Returned next anchor (id: {}) after previous (id: {}) from mempool (responded in {} ms)", - next_id, - prev_anchor_id, - request_timer.elapsed().as_millis(), - ); + let range = anchors_cache_r.get_range((prev_anchor_id as usize)..); + + if let Some(slice) = range { + if let Some((next_id, next)) = slice.first() { + if stub_first_attempt { + tracing::debug!( + target: tracing_targets::MEMPOOL_ADAPTER, + "Found in cache next anchor (id: {}) after specified previous (id: {})", + next_id, + prev_anchor_id, + ); + } else { + tracing::debug!( + target: tracing_targets::MEMPOOL_ADAPTER, + "STUB: Returned next anchor (id: {}) after previous (id: {}) from mempool (responded in {} ms)", + next_id, + prev_anchor_id, + request_timer.elapsed().as_millis(), + ); + } + return Ok(next.clone()); } - return Ok(next.clone()); - } else if stub_first_attempt { + } + + if stub_first_attempt { tracing::debug!( target: tracing_targets::MEMPOOL_ADAPTER, "There is no next anchor in cache after previous (id: {}). STUB: Requested it from mempool. Waiting...", @@ -192,7 +218,7 @@ pub(super) async fn stub_get_next_anchor( request_timer = std::time::Instant::now(); } stub_first_attempt = false; - tokio::time::sleep(tokio::time::Duration::from_millis(1020)).await; + anchor_added.await; } }