From e69142f5b428959ba290da6c5bf44ae8f36564aa Mon Sep 17 00:00:00 2001 From: mrnaveira <47919901+mrnaveira@users.noreply.github.com> Date: Tue, 24 Sep 2024 23:27:20 +0100 Subject: [PATCH] create GossipReceiver type --- Cargo.lock | 1 + applications/tari_validator_node/Cargo.toml | 1 + .../src/p2p/services/mempool/gossip.rs | 44 ++++++++++++++++++- networking/core/src/gossip.rs | 39 ++++++++++++++++ networking/core/src/lib.rs | 2 + networking/libp2p-messaging/src/codec/mod.rs | 4 +- .../libp2p-messaging/src/codec/prost.rs | 4 +- 7 files changed, 90 insertions(+), 5 deletions(-) create mode 100644 networking/core/src/gossip.rs diff --git a/Cargo.lock b/Cargo.lock index c2ffdee88..e296515dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10062,6 +10062,7 @@ dependencies = [ "tari_rpc_state_sync", "tari_shutdown", "tari_state_store_sqlite", + "tari_swarm", "tari_template_builtin", "tari_template_lib", "tari_transaction", diff --git a/applications/tari_validator_node/Cargo.toml b/applications/tari_validator_node/Cargo.toml index 003b3df19..b5a7c4e55 100644 --- a/applications/tari_validator_node/Cargo.toml +++ b/applications/tari_validator_node/Cargo.toml @@ -42,6 +42,7 @@ tari_state_store_sqlite = { workspace = true } tari_networking = { workspace = true } tari_rpc_framework = { workspace = true } tari_template_builtin = { workspace = true } +tari_swarm = { workspace = true } sqlite_message_logger = { workspace = true } diff --git a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs index 8c60d99c8..73fbc8c70 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs @@ -8,13 +8,55 @@ use log::*; use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, SubstateAddress}; use tari_dan_p2p::{proto, DanMessage, TariMessagingSpec}; use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader}; -use tari_networking::{NetworkingHandle, NetworkingService}; +use tari_networking::{GossipReceiver, GossipReceiverError, NetworkingHandle, NetworkingService}; +use tari_swarm::messaging::prost::ProstCodec; use tokio::sync::mpsc; +use tari_swarm::messaging::Codec; +use async_trait::async_trait; use crate::p2p::services::mempool::MempoolError; const LOG_TARGET: &str = "tari::validator_node::mempool::gossip"; +pub struct MempoolGossipReceiver { + codec: ProstCodec, + tx_gossip: mpsc::UnboundedSender<(PeerId, proto::network::DanMessage)>, +} + +impl MempoolGossipReceiver { + pub fn new(tx_gossip: mpsc::UnboundedSender<(PeerId, proto::network::DanMessage)>) -> Self { + Self { + codec: ProstCodec::default(), + tx_gossip + } + } + + pub async fn encode(&self, message: proto::network::DanMessage) -> std::io::Result> { + let mut buf = Vec::with_capacity(1024); + self.codec + .encode_to(&mut buf, message) + .await?; + Ok(buf) + } +} + +#[async_trait] +impl GossipReceiver for MempoolGossipReceiver { + async fn receive_message(&self, from: PeerId, message: libp2p::gossipsub::Message) -> Result { + let (length, message) = self + .codec + .decode_from(&mut message.data.as_slice()) + .await + .map_err(GossipReceiverError::CodecError)?; + + self.tx_gossip + .send((from, message)) + .map_err(|e| GossipReceiverError::Other(e.to_string()))?; + + Ok(length) + } +} + #[derive(Debug)] pub(super) struct MempoolGossip { num_preshards: NumPreshards, diff --git a/networking/core/src/gossip.rs b/networking/core/src/gossip.rs new file mode 100644 index 000000000..f590b9c3e --- /dev/null +++ b/networking/core/src/gossip.rs @@ -0,0 +1,39 @@ +// Copyright 2024. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::io; + +use async_trait::async_trait; +use libp2p::{gossipsub, PeerId}; + +#[async_trait] +pub trait GossipReceiver { + async fn receive_message(&self, from: PeerId, msg: gossipsub::Message) -> Result; +} + +#[derive(Debug, thiserror::Error)] +pub enum GossipReceiverError { + #[error("Codec IO error: {0}")] + CodecError(io::Error), + #[error("Gossipsub error: {0}")] + Other(String), +} \ No newline at end of file diff --git a/networking/core/src/lib.rs b/networking/core/src/lib.rs index bd29fe10e..fc21563da 100644 --- a/networking/core/src/lib.rs +++ b/networking/core/src/lib.rs @@ -20,6 +20,7 @@ mod config; mod connection; mod event; mod global_ip; +mod gossip; mod handle; mod message; mod notify; @@ -29,6 +30,7 @@ mod spawn; pub use config::*; pub use connection::*; +pub use gossip::*; pub use handle::*; pub use message::*; pub use spawn::*; diff --git a/networking/libp2p-messaging/src/codec/mod.rs b/networking/libp2p-messaging/src/codec/mod.rs index 0be9637f1..e54d404f7 100644 --- a/networking/libp2p-messaging/src/codec/mod.rs +++ b/networking/libp2p-messaging/src/codec/mod.rs @@ -18,11 +18,11 @@ pub trait Codec: Default { /// Reads a message from the given I/O stream according to the /// negotiated protocol. - async fn decode_from(&mut self, reader: &mut R) -> io::Result<(usize, Self::Message)> + async fn decode_from(&self, reader: &mut R) -> io::Result<(usize, Self::Message)> where R: AsyncRead + Unpin + Send; /// Writes a request to the given I/O stream according to the /// negotiated protocol. - async fn encode_to(&mut self, writer: &mut W, message: Self::Message) -> io::Result<()> + async fn encode_to(&self, writer: &mut W, message: Self::Message) -> io::Result<()> where W: AsyncWrite + Unpin + Send; } diff --git a/networking/libp2p-messaging/src/codec/prost.rs b/networking/libp2p-messaging/src/codec/prost.rs index 6b0af47e1..65ab9fd04 100644 --- a/networking/libp2p-messaging/src/codec/prost.rs +++ b/networking/libp2p-messaging/src/codec/prost.rs @@ -26,7 +26,7 @@ where TMsg: prost::Message + Default { type Message = TMsg; - async fn decode_from(&mut self, reader: &mut R) -> std::io::Result<(usize, Self::Message)> + async fn decode_from(&self, reader: &mut R) -> std::io::Result<(usize, Self::Message)> where R: AsyncRead + Unpin + Send { let mut len_buf = [0u8; 4]; reader.read_exact(&mut len_buf).await?; @@ -49,7 +49,7 @@ where TMsg: prost::Message + Default Ok((len, message)) } - async fn encode_to(&mut self, writer: &mut W, message: Self::Message) -> std::io::Result<()> + async fn encode_to(&self, writer: &mut W, message: Self::Message) -> std::io::Result<()> where W: AsyncWrite + Unpin + Send { let mut buf = Vec::new(); message