Skip to content

Commit

Permalink
feat(rpc): allow broadcasting messages to yourself
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov committed Jun 5, 2024
1 parent 01d71da commit 6ca1231
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 13 deletions.
14 changes: 8 additions & 6 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,11 @@ pub struct Node {
pub storage: Storage,

pub state_tracker: MinRefMcStateTracker,
pub blockchain_block_provider_config: BlockchainBlockProviderConfig,

pub rpc_config: Option<RpcConfig>,
pub public_overlay_client_config: PublicOverlayClientConfig,
pub blockchain_rpc_service_config: BlockchainRpcServiceConfig,
pub blockchain_block_provider_config: BlockchainBlockProviderConfig,
}

impl Node {
Expand Down Expand Up @@ -331,10 +331,10 @@ impl Node {
overlay_service,
storage,
state_tracker,
blockchain_block_provider_config: node_config.blockchain_block_provider,
rpc_config: node_config.rpc,
public_overlay_client_config: node_config.public_overlay_client,
blockchain_rpc_service_config: node_config.blockchain_rpc_service,
blockchain_block_provider_config: node_config.blockchain_block_provider,
})
}

Expand Down Expand Up @@ -505,11 +505,13 @@ impl Node {
.build(blockchain_rpc_service);
self.overlay_service.add_public_overlay(&public_overlay);

let blockchain_rpc_client = BlockchainRpcClient::new(PublicOverlayClient::new(
let blockchain_rpc_client = BlockchainRpcClient::builder(PublicOverlayClient::new(
self.network.clone(),
public_overlay,
self.public_overlay_client_config.clone(),
));
))
.with_self_broadcast_listener(mempool_adapter.clone())
.build();

tracing::info!(
overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
Expand Down Expand Up @@ -621,7 +623,7 @@ impl Node {
let mc_state = self
.storage
.shard_state_storage()
.load_state(&last_block_id)
.load_state(last_block_id)
.await?;

collator_state_subscriber
Expand Down Expand Up @@ -719,7 +721,7 @@ fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result<Shar
file_hash,
};

ShardStateStuff::from_root(&block_id, root, &tracker)
ShardStateStuff::from_root(&block_id, root, tracker)
}

fn make_shard_state(
Expand Down
6 changes: 2 additions & 4 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
use std::collections::BTreeMap;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use bytes::Bytes;
use everscale_crypto::ed25519::KeyPair;
use everscale_types::boc::Boc;
use everscale_types::models::{ExtInMsgInfo, MsgInfo};
use everscale_types::models::MsgInfo;
use everscale_types::prelude::Load;
use parking_lot::RwLock;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedReceiver;
use tycho_block_util::state::ShardStateStuff;
use tycho_consensus::{InputBufferImpl, Point};
use tycho_network::{DhtClient, InboundRequestMeta, OverlayService, PeerId};
use tycho_network::{DhtClient, OverlayService, PeerId};
use tycho_util::FastHashSet;

use crate::mempool::types::ExternalMessage;
Expand Down Expand Up @@ -111,7 +110,6 @@ impl MempoolAdapterStdImpl {
InputBufferImpl::new(externals_rx),
);
tokio::spawn(async move {
// TODO replace with some sensible init before run
engine.init_with_genesis(&peers).await;
engine.run().await;
});
Expand Down
74 changes: 71 additions & 3 deletions core/src/blockchain_rpc/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::sync::Arc;

use anyhow::Result;
use bytes::Bytes;
use everscale_types::models::BlockId;
use futures_util::stream::{FuturesUnordered, StreamExt};
use tycho_block_util::state::ShardStateStuff;
use tycho_collator::mempool::MempoolAdapterStdImpl;
use tycho_network::{PublicOverlay, Request};
use tycho_storage::Storage;
use tycho_util::futures::JoinTask;
Expand All @@ -12,6 +14,65 @@ use crate::overlay_client::{Error, Neighbour, PublicOverlayClient, QueryResponse
use crate::proto::blockchain::*;
use crate::proto::overlay::BroadcastPrefix;

pub trait SelfBroadcastListener: Send + Sync + 'static {
type Output;

fn handle_message(&self, message: Bytes) -> Self::Output;
}

#[derive(Debug, Default, Clone, Copy, Eq, PartialEq)]
pub struct SelfNoopBroadcastListener;

impl SelfBroadcastListener for SelfNoopBroadcastListener {
type Output = ();

#[inline]
fn handle_message(&self, _: Bytes) -> Self::Output {}
}

impl SelfBroadcastListener for Arc<MempoolAdapterStdImpl> {
type Output = ();

fn handle_message(&self, message: Bytes) -> Self::Output {
self.send_external(message)
}
}

pub struct BlockchainRpcClientBuilder {
overlay_client: PublicOverlayClient,
broadcast_listener: Box<dyn SelfBroadcastListener<Output = ()>>,
}

impl<'a> BlockchainRpcClientBuilder {
pub fn build(self) -> BlockchainRpcClient {
BlockchainRpcClient {
inner: Arc::new(Inner {
overlay_client: self.overlay_client,
broadcast_listener: self.broadcast_listener,
}),
}
}
}

impl BlockchainRpcClientBuilder {
pub fn with_self_broadcast_listener(
self,
self_broadcast_listener: impl SelfBroadcastListener<Output = ()>,
) -> BlockchainRpcClientBuilder {
BlockchainRpcClientBuilder {
overlay_client: self.overlay_client,
broadcast_listener: Box::new(self_broadcast_listener),
}
}

pub fn without_self_broadcast_listener(self) -> BlockchainRpcClientBuilder {
BlockchainRpcClientBuilder {
overlay_client: self.overlay_client,
broadcast_listener: self.broadcast_listener,
}
}
}

#[derive(Clone)]
#[repr(transparent)]
pub struct BlockchainRpcClient {
Expand All @@ -20,12 +81,14 @@ pub struct BlockchainRpcClient {

struct Inner {
overlay_client: PublicOverlayClient,
broadcast_listener: Box<dyn SelfBroadcastListener<Output = ()>>,
}

impl BlockchainRpcClient {
pub fn new(overlay_client: PublicOverlayClient) -> Self {
Self {
inner: Arc::new(Inner { overlay_client }),
pub fn builder(overlay_client: PublicOverlayClient) -> BlockchainRpcClientBuilder {
BlockchainRpcClientBuilder {
overlay_client,
broadcast_listener: Box::new(SelfNoopBroadcastListener),
}
}

Expand Down Expand Up @@ -73,6 +136,11 @@ impl BlockchainRpcClient {
futures.push(client.send_raw(neighbour, req.clone()));
}

// Broadcast to yourself
self.inner
.broadcast_listener
.handle_message(Bytes::copy_from_slice(message));

while let Some(res) = futures.next().await {
if let Err(e) = res {
tracing::warn!("failed to broadcast external message: {e}");
Expand Down

0 comments on commit 6ca1231

Please sign in to comment.