Skip to content

Commit

Permalink
Merge branch 'fix/mempool-adapter-stub'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 8, 2024
2 parents bc0c5cd + 113d783 commit 1479a57
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 11 deletions.
46 changes: 37 additions & 9 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Result;
Expand All @@ -8,15 +7,15 @@ use everscale_crypto::ed25519::KeyPair;
use everscale_types::boc::Boc;
use everscale_types::models::MsgInfo;
use everscale_types::prelude::Load;
use indexmap::IndexMap;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::{mpsc, Notify};
use tycho_block_util::state::ShardStateStuff;
use tycho_consensus::{InputBufferImpl, Point};
use tycho_network::{DhtClient, OverlayService, PeerId};
use tycho_util::FastHashSet;

use super::mempool_adapter_stub::{stub_get_anchor_by_id, stub_get_next_anchor};
use crate::mempool::external_message_cache::ExternalMessageCache;
use crate::mempool::types::ExternalMessage;
use crate::mempool::{MempoolAnchor, MempoolAnchorId};
Expand Down Expand Up @@ -81,8 +80,10 @@ pub trait MempoolAdapter: Send + Sync + 'static {
#[derive(Clone)]
pub struct MempoolAdapterStdImpl {
// TODO: replace with rocksdb
anchors: Arc<RwLock<BTreeMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
anchors: Arc<RwLock<IndexMap<MempoolAnchorId, Arc<MempoolAnchor>>>>,
externals_tx: tokio::sync::mpsc::UnboundedSender<Bytes>,

anchor_added: Arc<tokio::sync::Notify>,
}

impl MempoolAdapterStdImpl {
Expand All @@ -93,7 +94,7 @@ impl MempoolAdapterStdImpl {
peers: Vec<PeerId>,
) -> Arc<Self> {
tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Creating mempool adapter...");
let anchors = Arc::new(RwLock::new(BTreeMap::new()));
let anchors = Arc::new(RwLock::new(IndexMap::new()));

let (sender, receiver) =
tokio::sync::mpsc::unbounded_channel::<(Arc<Point>, Vec<Arc<Point>>)>();
Expand All @@ -118,6 +119,7 @@ impl MempoolAdapterStdImpl {
let mempool_adapter = Arc::new(Self {
anchors,
externals_tx,
anchor_added: Arc::new(Notify::new()),
});

tokio::spawn(handle_anchors(mempool_adapter.clone(), receiver));
Expand All @@ -130,8 +132,12 @@ impl MempoolAdapterStdImpl {
}

fn add_anchor(&self, anchor: Arc<MempoolAnchor>) {
let mut guard = self.anchors.write();
guard.insert(anchor.id(), anchor);
{
let mut guard = self.anchors.write();
guard.insert(anchor.id(), anchor);
}

self.anchor_added.notify_waiters()
}
}

Expand Down Expand Up @@ -218,13 +224,35 @@ impl MempoolAdapter for MempoolAdapterStdImpl {
) -> Result<Option<Arc<MempoolAnchor>>> {
// TODO: make real implementation, currently only return anchor from local cache

stub_get_anchor_by_id(self.anchors.clone(), anchor_id).await
Ok(self.anchors.read().get(&anchor_id).cloned())
}

async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<Arc<MempoolAnchor>> {
// TODO: make real implementation, currently only return anchor from local cache
loop {
// NOTE: Subscribe to notification before checking
let anchor_added = self.anchor_added.notified();

{
let anchors = self.anchors.read();
if let Some((oldest, _)) = anchors.first() {
anyhow::ensure!(
prev_anchor_id >= *oldest,
"Requested anchor {prev_anchor_id} is too old"
);
}

let Some(index) = anchors.get_index_of(&prev_anchor_id) else {
anyhow::bail!("Presented anchor {prev_anchor_id} is unknown");
};

stub_get_next_anchor(self.anchors.clone(), prev_anchor_id).await
if let Some((_, value)) = anchors.get_index(index + 1) {
return Ok(value.clone());
}
}

anchor_added.await;
}
}

async fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,8 @@ pub async fn check_anchors(
}
});

tracing::info!("Anchor hashmap len: {}", guard.len());
tracing::info!("Refs hashmap ken: {}", refs_guard.len());
tracing::debug!("Anchor hashmap len: {}", guard.len());
tracing::debug!("Refs hashmap ken: {}", refs_guard.len());

drop(guard);
drop(refs_guard);
Expand Down

0 comments on commit 1479a57

Please sign in to comment.