Skip to content

Commit

Permalink
create GossipReceiver type
Browse files Browse the repository at this point in the history
  • Loading branch information
mrnaveira committed Sep 24, 2024
1 parent 622718a commit e69142f
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 5 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ tari_state_store_sqlite = { workspace = true }
tari_networking = { workspace = true }
tari_rpc_framework = { workspace = true }
tari_template_builtin = { workspace = true }
tari_swarm = { workspace = true }

sqlite_message_logger = { workspace = true }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,55 @@ use log::*;
use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, SubstateAddress};
use tari_dan_p2p::{proto, DanMessage, TariMessagingSpec};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};
use tari_networking::{NetworkingHandle, NetworkingService};
use tari_networking::{GossipReceiver, GossipReceiverError, NetworkingHandle, NetworkingService};
use tari_swarm::messaging::prost::ProstCodec;
use tokio::sync::mpsc;
use tari_swarm::messaging::Codec;
use async_trait::async_trait;

use crate::p2p::services::mempool::MempoolError;

const LOG_TARGET: &str = "tari::validator_node::mempool::gossip";

pub struct MempoolGossipReceiver {
codec: ProstCodec<proto::network::DanMessage>,
tx_gossip: mpsc::UnboundedSender<(PeerId, proto::network::DanMessage)>,
}

impl MempoolGossipReceiver {
pub fn new(tx_gossip: mpsc::UnboundedSender<(PeerId, proto::network::DanMessage)>) -> Self {
Self {
codec: ProstCodec::default(),
tx_gossip
}
}

pub async fn encode(&self, message: proto::network::DanMessage) -> std::io::Result<Vec<u8>> {
let mut buf = Vec::with_capacity(1024);
self.codec
.encode_to(&mut buf, message)
.await?;
Ok(buf)
}
}

#[async_trait]
impl GossipReceiver for MempoolGossipReceiver {
async fn receive_message(&self, from: PeerId, message: libp2p::gossipsub::Message) -> Result<usize, GossipReceiverError> {
let (length, message) = self
.codec
.decode_from(&mut message.data.as_slice())
.await
.map_err(GossipReceiverError::CodecError)?;

self.tx_gossip
.send((from, message))
.map_err(|e| GossipReceiverError::Other(e.to_string()))?;

Ok(length)
}
}

#[derive(Debug)]
pub(super) struct MempoolGossip<TAddr> {
num_preshards: NumPreshards,
Expand Down
39 changes: 39 additions & 0 deletions networking/core/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright 2024. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::io;

use async_trait::async_trait;
use libp2p::{gossipsub, PeerId};

#[async_trait]
pub trait GossipReceiver {
async fn receive_message(&self, from: PeerId, msg: gossipsub::Message) -> Result<usize, GossipReceiverError>;
}

#[derive(Debug, thiserror::Error)]
pub enum GossipReceiverError {
#[error("Codec IO error: {0}")]
CodecError(io::Error),
#[error("Gossipsub error: {0}")]
Other(String),
}
2 changes: 2 additions & 0 deletions networking/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod config;
mod connection;
mod event;
mod global_ip;
mod gossip;
mod handle;
mod message;
mod notify;
Expand All @@ -29,6 +30,7 @@ mod spawn;

pub use config::*;
pub use connection::*;
pub use gossip::*;
pub use handle::*;
pub use message::*;
pub use spawn::*;
Expand Down
4 changes: 2 additions & 2 deletions networking/libp2p-messaging/src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ pub trait Codec: Default {

/// Reads a message from the given I/O stream according to the
/// negotiated protocol.
async fn decode_from<R>(&mut self, reader: &mut R) -> io::Result<(usize, Self::Message)>
async fn decode_from<R>(&self, reader: &mut R) -> io::Result<(usize, Self::Message)>
where R: AsyncRead + Unpin + Send;

/// Writes a request to the given I/O stream according to the
/// negotiated protocol.
async fn encode_to<W>(&mut self, writer: &mut W, message: Self::Message) -> io::Result<()>
async fn encode_to<W>(&self, writer: &mut W, message: Self::Message) -> io::Result<()>
where W: AsyncWrite + Unpin + Send;
}
4 changes: 2 additions & 2 deletions networking/libp2p-messaging/src/codec/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ where TMsg: prost::Message + Default
{
type Message = TMsg;

async fn decode_from<R>(&mut self, reader: &mut R) -> std::io::Result<(usize, Self::Message)>
async fn decode_from<R>(&self, reader: &mut R) -> std::io::Result<(usize, Self::Message)>
where R: AsyncRead + Unpin + Send {
let mut len_buf = [0u8; 4];
reader.read_exact(&mut len_buf).await?;
Expand All @@ -49,7 +49,7 @@ where TMsg: prost::Message + Default
Ok((len, message))
}

async fn encode_to<W>(&mut self, writer: &mut W, message: Self::Message) -> std::io::Result<()>
async fn encode_to<W>(&self, writer: &mut W, message: Self::Message) -> std::io::Result<()>
where W: AsyncWrite + Unpin + Send {
let mut buf = Vec::new();
message
Expand Down

0 comments on commit e69142f

Please sign in to comment.