Skip to content

Commit

Permalink
feat(collator): refactor mempool adapter stubs
Browse files Browse the repository at this point in the history
  • Loading branch information
serejkaaa512 committed Jul 10, 2024
1 parent 23ee8c1 commit db0a4d2
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 71 deletions.
83 changes: 58 additions & 25 deletions collator/src/mempool/mempool_adapter_ext_from_files_stub.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use std::collections::BTreeMap;
use std::fs::File;
use std::io::Read;
use std::path::PathBuf;
use std::sync::Arc;
use std::{fs, io};
Expand All @@ -9,33 +6,52 @@ use anyhow::Result;
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Bytes;
use everscale_crypto::ed25519::KeyPair;
use everscale_types::boc::Boc;
use everscale_types::cell::Load;
use everscale_types::models::{MsgInfo, OwnedMessage};
use indexmap::IndexMap;
use parking_lot::RwLock;
use rand::Rng;
use tokio::sync::Notify;
use tycho_block_util::state::ShardStateStuff;
use tycho_network::PeerId;
use tycho_network::{DhtClient, OverlayService, PeerId};

use super::mempool_adapter_stub::{stub_get_anchor_by_id, stub_get_next_anchor};
use super::types::{ExternalMessage, MempoolAnchor, MempoolAnchorId};
use crate::mempool::mempool_adapter::{MempoolAdapter, MempoolEventListener};
use crate::mempool::MempoolAdapterFactory;
use crate::tracing_targets;

// FACTORY

#[derive(Clone)]
pub struct MempoolAdapterExtFilesStubImpl {
_listener: Arc<dyn MempoolEventListener>,
pub anchors: Arc<RwLock<IndexMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,

_stub_anchors_cache: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
anchor_added: Arc<Notify>,
}

impl MempoolAdapterExtFilesStubImpl {
pub fn new(listener: Arc<dyn MempoolEventListener>) -> Self {
pub fn new() -> Arc<Self> {
let anchors = Arc::new(RwLock::new(IndexMap::new()));

tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter...");

let stub_anchors_cache = Arc::new(RwLock::new(BTreeMap::new()));
Arc::new(Self {
anchors,
anchor_added: Arc::new(Notify::new()),
})
}

pub fn run(
self: &Arc<Self>,
_key_pair: Arc<KeyPair>,
_dht_client: DhtClient,
_overlay_service: OverlayService,
_peers: Vec<PeerId>,
) {
let externals_dir = PathBuf::from("test/externals/set02");
let mut externals = fs::read_dir(externals_dir)
.expect("externals dir not found")
Expand All @@ -44,10 +60,11 @@ impl MempoolAdapterExtFilesStubImpl {
.expect("failed to read externals dir");

externals.sort();
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool adapter created");

let stub_anchors_cache = self.anchors.clone();
let adapter = self.clone();
tokio::spawn({
let listener = listener.clone();
let stub_anchors_cache = stub_anchors_cache.clone();
async move {
let mut anchor_id = 0;
let mut externals_iter = externals.into_iter();
Expand All @@ -57,7 +74,7 @@ impl MempoolAdapterExtFilesStubImpl {
tokio::time::sleep(tokio::time::Duration::from_millis(rnd_round_interval * 6))
.await;
anchor_id += 1;
if let Some(external) = externals_iter.next() {
let anchor = if let Some(external) = externals_iter.next() {
let anchor = create_anchor_with_externals_from_file(anchor_id, external);
{
let mut anchor_cache_rw = stub_anchors_cache.write();
Expand All @@ -71,7 +88,7 @@ impl MempoolAdapterExtFilesStubImpl {
anchor_cache_rw.insert(anchor_id, anchor.clone());
}
last_chain_time = anchor.chain_time();
listener.on_new_anchor(anchor).await.unwrap();
anchor
} else {
let anchor = create_empty_anchor(anchor_id, last_chain_time);
{
Expand All @@ -86,19 +103,31 @@ impl MempoolAdapterExtFilesStubImpl {
anchor_cache_rw.insert(anchor_id, anchor.clone());
}
last_chain_time = anchor.chain_time();
listener.on_new_anchor(anchor).await.unwrap();
}
anchor
};
adapter.add_anchor(anchor);
}
}
});
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Anchors generator with externals from files started");
}

tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool adapter created");
pub fn send_external(&self, _message: Bytes) {}

Self {
_listener: listener,
_stub_anchors_cache: stub_anchors_cache,
fn add_anchor(&self, anchor: Arc<MempoolAnchor>) {
{
let mut guard = self.anchors.write();
guard.insert(anchor.id(), anchor);
}

self.anchor_added.notify_waiters();
}
}

impl MempoolAdapterFactory for Arc<MempoolAdapterExtFilesStubImpl> {
type Adapter = MempoolAdapterExtFilesStubImpl;

fn create(&self, _listener: Arc<dyn MempoolEventListener>) -> Arc<Self::Adapter> {
self.clone()
}
}

Expand All @@ -118,15 +147,21 @@ impl MempoolAdapter for MempoolAdapterExtFilesStubImpl {
&self,
anchor_id: MempoolAnchorId,
) -> Result<Option<Arc<MempoolAnchor>>> {
stub_get_anchor_by_id(self._stub_anchors_cache.clone(), anchor_id).await
stub_get_anchor_by_id(self.anchors.clone(), anchor_id).await
}

async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<Arc<MempoolAnchor>> {
stub_get_next_anchor(self._stub_anchors_cache.clone(), prev_anchor_id).await
stub_get_next_anchor(
self.anchors.clone(),
prev_anchor_id,
self.anchor_added.clone(),
)
.await
}

async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
let mut anchors_cache_rw = self._stub_anchors_cache.write();
let mut anchors_cache_rw = self.anchors.write();

anchors_cache_rw.retain(|anchor_id, _| anchor_id >= &before_anchor_id);
Ok(())
}
Expand All @@ -137,9 +172,7 @@ pub fn create_anchor_with_externals_from_file(
external_path: PathBuf,
) -> Arc<MempoolAnchor> {
let mut externals = vec![];
let mut file = File::open(&external_path).unwrap();
let mut buf = vec![];
file.read_to_end(&mut buf).unwrap();
let buf = fs::read(&external_path).unwrap();
let file_name = external_path.file_name().unwrap().to_str().unwrap();
tracing::info!("read external from file: {}", file_name);
let timestamp: u64 = file_name.parse().unwrap();
Expand Down
118 changes: 72 additions & 46 deletions collator/src/mempool/mempool_adapter_stub.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use everscale_crypto::ed25519::KeyPair;
use everscale_types::cell::{CellBuilder, HashBytes};
use everscale_types::models::{ExtInMsgInfo, IntAddr, StdAddr};
use indexmap::IndexMap;
use parking_lot::RwLock;
use rand::Rng;
use tokio::sync::Notify;
use tycho_block_util::state::ShardStateStuff;
use tycho_network::PeerId;
use tycho_network::{DhtClient, OverlayService, PeerId};

use super::types::{ExternalMessage, MempoolAnchor, MempoolAnchorId};
use crate::mempool::mempool_adapter::{MempoolAdapter, MempoolEventListener};
use crate::mempool::mempool_adapter::MempoolAdapter;
use crate::tracing_targets;

#[cfg(test)]
Expand All @@ -21,20 +24,33 @@ pub(super) mod tests;
// FACTORY

pub struct MempoolAdapterStubImpl {
_listener: Arc<dyn MempoolEventListener>,
anchors: Arc<RwLock<IndexMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,

_stub_anchors_cache: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
anchor_added: Arc<Notify>,
}

impl MempoolAdapterStubImpl {
pub fn new(listener: Arc<dyn MempoolEventListener>) -> Self {
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter...");
pub fn new() -> Arc<Self> {
let anchors = Arc::new(RwLock::new(IndexMap::new()));

let stub_anchors_cache = Arc::new(RwLock::new(BTreeMap::new()));
Arc::new(Self {
anchors,
anchor_added: Arc::new(Notify::new()),
})
}

pub fn run(
self: &Arc<Self>,
_key_pair: Arc<KeyPair>,
_dht_client: DhtClient,
_overlay_service: OverlayService,
_peers: Vec<PeerId>,
) {
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter...");

let stub_anchors_cache = self.anchors.clone();
let adapter = self.clone();
tokio::spawn({
let listener = listener.clone();
let stub_anchors_cache = stub_anchors_cache.clone();
async move {
let mut anchor_id = 0;
loop {
Expand All @@ -54,18 +70,20 @@ impl MempoolAdapterStubImpl {
anchor.externals_count(),
);
}
listener.on_new_anchor(anchor).await.unwrap();
adapter.add_anchor(anchor);
}
}
});
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Stub anchors generator started");

tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool adapter created");
}
pub fn send_external(&self, _message: Bytes) {}

Self {
_listener: listener,
_stub_anchors_cache: stub_anchors_cache,
fn add_anchor(&self, anchor: Arc<MempoolAnchor>) {
{
let mut guard = self.anchors.write();
guard.insert(anchor.id(), anchor);
}

self.anchor_added.notify_waiters();
}
}

Expand All @@ -85,22 +103,27 @@ impl MempoolAdapter for MempoolAdapterStubImpl {
&self,
anchor_id: MempoolAnchorId,
) -> Result<Option<Arc<MempoolAnchor>>> {
stub_get_anchor_by_id(self._stub_anchors_cache.clone(), anchor_id).await
stub_get_anchor_by_id(self.anchors.clone(), anchor_id).await
}

async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<Arc<MempoolAnchor>> {
stub_get_next_anchor(self._stub_anchors_cache.clone(), prev_anchor_id).await
stub_get_next_anchor(
self.anchors.clone(),
prev_anchor_id,
self.anchor_added.clone(),
)
.await
}

async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
let mut anchors_cache_rw = self._stub_anchors_cache.write();
let mut anchors_cache_rw = self.anchors.write();
anchors_cache_rw.retain(|anchor_id, _| anchor_id >= &before_anchor_id);
Ok(())
}
}

pub(super) async fn stub_get_anchor_by_id(
anchors_cache: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
anchors_cache: Arc<RwLock<IndexMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
anchor_id: MempoolAnchorId,
) -> Result<Option<Arc<MempoolAnchor>>> {
let mut first_attempt = true;
Expand Down Expand Up @@ -146,39 +169,42 @@ pub(super) async fn stub_get_anchor_by_id(
}

pub(super) async fn stub_get_next_anchor(
anchors_cache: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
anchors_cache: Arc<RwLock<IndexMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
prev_anchor_id: MempoolAnchorId,
anchor_added: Arc<Notify>,
) -> Result<Arc<MempoolAnchor>> {
let mut stub_first_attempt = true;
let mut request_timer = std::time::Instant::now();
loop {
let anchor_added = anchor_added.notified();
{
let anchors_cache_r = anchors_cache.read();

let mut range = anchors_cache_r.range((
std::ops::Bound::Excluded(prev_anchor_id),
std::ops::Bound::Unbounded,
));

if let Some((next_id, next)) = range.next() {
if stub_first_attempt {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
"Found in cache next anchor (id: {}) after specified previous (id: {})",
next_id,
prev_anchor_id,
);
} else {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
"STUB: Returned next anchor (id: {}) after previous (id: {}) from mempool (responded in {} ms)",
next_id,
prev_anchor_id,
request_timer.elapsed().as_millis(),
);
let range = anchors_cache_r.get_range((prev_anchor_id as usize)..);

if let Some(slice) = range {
if let Some((next_id, next)) = slice.first() {
if stub_first_attempt {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
"Found in cache next anchor (id: {}) after specified previous (id: {})",
next_id,
prev_anchor_id,
);
} else {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
"STUB: Returned next anchor (id: {}) after previous (id: {}) from mempool (responded in {} ms)",
next_id,
prev_anchor_id,
request_timer.elapsed().as_millis(),
);
}
return Ok(next.clone());
}
return Ok(next.clone());
} else if stub_first_attempt {
}

if stub_first_attempt {
tracing::debug!(
target: tracing_targets::MEMPOOL_ADAPTER,
"There is no next anchor in cache after previous (id: {}). STUB: Requested it from mempool. Waiting...",
Expand All @@ -192,7 +218,7 @@ pub(super) async fn stub_get_next_anchor(
request_timer = std::time::Instant::now();
}
stub_first_attempt = false;
tokio::time::sleep(tokio::time::Duration::from_millis(1020)).await;
anchor_added.await;
}
}

Expand Down

0 comments on commit db0a4d2

Please sign in to comment.