From bfaaff60080a471ff365512f19234d5701320e4f Mon Sep 17 00:00:00 2001 From: Brian Ginsburg <7957636+bgins@users.noreply.github.com> Date: Wed, 15 Nov 2023 12:55:11 -0800 Subject: [PATCH] refactor: Add Gossipsub Message wrapper (#436) # Description This PR implements the following changes: - [x] Add gosssipsub message wrapper - [x] Rename `store_and_notify` to `publish_and_notify` The message wrapper includes a header with a nonce to force the gossip of duplicate receipts. We will likely expand on the header in future work and make the nonce optional. ## Link to issue Implements #421. ## Type of change - [x] Refactor (non-breaking change that updates existing functionality) ## Test plan (required) We've added a unit test to roundtrip a gossiped message to bytes and back again. We also have a gossip notifications integration test to confirm messages are still sent. --------- Co-authored-by: Zeeshan Lakhani --- homestar-runtime/src/event_handler/event.rs | 10 +- .../src/event_handler/swarm_event.rs | 70 ++++---- homestar-runtime/src/network/pubsub.rs | 7 +- .../src/network/pubsub/message.rs | 153 ++++++++++++++++++ homestar-runtime/src/network/swarm.rs | 7 +- 5 files changed, 205 insertions(+), 42 deletions(-) create mode 100644 homestar-runtime/src/network/pubsub/message.rs diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index de46ddd1..5ca10e11 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -129,7 +129,7 @@ impl Event { { match self { Event::CapturedReceipt(captured) => { - let _ = captured.store_and_notify(event_handler); + let _ = captured.publish_and_notify(event_handler); } Event::Shutdown(tx) => { info!("event_handler server shutting down"); @@ -243,7 +243,7 @@ impl Captured { } #[allow(dead_code)] - fn store_and_notify( + fn publish_and_notify( mut self, event_handler: &mut EventHandler, ) -> Result<(Cid, InvocationReceipt)> @@ -267,7 +267,7 @@ impl Captured { if event_handler.pubsub_enabled { match event_handler.swarm.behaviour_mut().gossip_publish( pubsub::RECEIPTS_TOPIC, - TopicMessage::CapturedReceipt(receipt.clone()), + TopicMessage::CapturedReceipt(pubsub::Message::new(receipt.clone())), ) { Ok(msg_id) => { info!( @@ -397,7 +397,7 @@ impl Replay { .behaviour_mut() .gossip_publish( pubsub::RECEIPTS_TOPIC, - TopicMessage::CapturedReceipt(receipt.clone()), + TopicMessage::CapturedReceipt(pubsub::Message::new(receipt.clone())), ) .map(|msg_id| { info!(cid=receipt_cid, @@ -543,7 +543,7 @@ where async fn handle_event(self, event_handler: &mut EventHandler, ipfs: IpfsCli) { match self { Event::CapturedReceipt(captured) => { - if let Ok((cid, receipt)) = captured.store_and_notify(event_handler) { + if let Ok((cid, receipt)) = captured.publish_and_notify(event_handler) { #[cfg(not(feature = "test-utils"))] { // Spawn client call in the background, without awaiting. diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 72fc13a8..cadc2935 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -13,8 +13,11 @@ use crate::{ Event, Handler, RequestResponseError, }, libp2p::multiaddr::MultiaddrExt, - network::swarm::{ - CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER, + network::{ + pubsub, + swarm::{ + CapsuleTag, ComposedEvent, PeerDiscoveryInfo, RequestResponseKey, HOMESTAR_PROTOCOL_VER, + }, }, receipt::{RECEIPT_TAG, VERSION_KEY}, workflow, @@ -403,38 +406,41 @@ async fn handle_swarm_event( message, propagation_source, message_id, - } => match Receipt::try_from(message.data) { - // TODO: dont fail blindly if we get a non receipt message - Ok(receipt) => { - info!( - peer_id = propagation_source.to_string(), - message_id = message_id.to_string(), - "message received on receipts topic: {}", - receipt.cid() - ); + } => { + let bytes: Vec = message.data; + match pubsub::Message::::try_from(bytes) { + // TODO: dont fail blindly if we get a non receipt message + Ok(msg) => { + let receipt = msg.payload; + info!( + peer_id = propagation_source.to_string(), + message_id = message_id.to_string(), + "message received on receipts topic: {receipt}" + ); - // Store gossiped receipt. - let _ = event_handler - .db - .conn() - .as_mut() - .map(|conn| Db::store_receipt(receipt.clone(), conn)); - - #[cfg(feature = "websocket-notify")] - notification::emit_event( - event_handler.ws_evt_sender(), - EventNotificationTyp::SwarmNotification( - SwarmNotification::ReceivedReceiptPubsub, - ), - btreemap! { - "peerId" => propagation_source.to_string(), - "cid" => receipt.cid().to_string(), - "ran" => receipt.ran().to_string() - }, - ); + // Store gossiped receipt. + let _ = event_handler + .db + .conn() + .as_mut() + .map(|conn| Db::store_receipt(receipt.clone(), conn)); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification( + SwarmNotification::ReceivedReceiptPubsub, + ), + btreemap! { + "peerId" => propagation_source.to_string(), + "cid" => receipt.cid().to_string(), + "ran" => receipt.ran().to_string() + }, + ); + } + Err(err) => info!(err=?err, "cannot handle incoming gossipsub message"), } - Err(err) => info!(err=?err, "cannot handle incoming gossipsub message"), - }, + } gossipsub::Event::Subscribed { peer_id, topic } => { debug!( peer_id = peer_id.to_string(), diff --git a/homestar-runtime/src/network/pubsub.rs b/homestar-runtime/src/network/pubsub.rs index 2928e245..051f76c7 100644 --- a/homestar-runtime/src/network/pubsub.rs +++ b/homestar-runtime/src/network/pubsub.rs @@ -5,7 +5,7 @@ use crate::settings; use anyhow::Result; use libp2p::{ - gossipsub::{self, ConfigBuilder, Message, MessageAuthenticity, MessageId, ValidationMode}, + gossipsub::{self, ConfigBuilder, MessageAuthenticity, MessageId, ValidationMode}, identity::Keypair, }; use std::{ @@ -13,6 +13,9 @@ use std::{ hash::{Hash, Hasher}, }; +pub(crate) mod message; +pub(crate) use message::Message; + /// [Receipt]-related topic for pub(gossip)sub. /// /// [Receipt]: homestar_core::workflow::receipt @@ -23,7 +26,7 @@ pub(crate) const RECEIPTS_TOPIC: &str = "receipts"; /// [gossipsub]: libp2p::gossipsub pub(crate) fn new(keypair: Keypair, settings: &settings::Node) -> Result { // To content-address message, we can take the hash of message and use it as an ID. - let message_id_fn = |message: &Message| { + let message_id_fn = |message: &gossipsub::Message| { let mut s = DefaultHasher::new(); message.data.hash(&mut s); MessageId::from(s.finish().to_string()) diff --git a/homestar-runtime/src/network/pubsub/message.rs b/homestar-runtime/src/network/pubsub/message.rs new file mode 100644 index 00000000..841212b8 --- /dev/null +++ b/homestar-runtime/src/network/pubsub/message.rs @@ -0,0 +1,153 @@ +use anyhow::{anyhow, Result}; +use homestar_core::workflow::Nonce; +use libipld::{self, cbor::DagCborCodec, prelude::Codec, serde::from_ipld, Ipld}; +use std::collections::BTreeMap; + +const HEADER_KEY: &str = "header"; +const PAYLOAD_KEY: &str = "payload"; +const NONCE_KEY: &str = "nonce"; + +#[derive(Debug)] +pub(crate) struct Message { + pub(crate) header: Header, + pub(crate) payload: T, +} + +impl Message { + pub(crate) fn new(payload: T) -> Self { + let header = Header { + nonce: Nonce::generate(), + }; + + Self { header, payload } + } +} + +impl TryFrom> for Vec +where + Ipld: From> + From, +{ + type Error = anyhow::Error; + + fn try_from(message: Message) -> Result { + let message_ipld = Ipld::from(message); + DagCborCodec.encode(&message_ipld) + } +} + +impl TryFrom> for Message +where + T: TryFrom, +{ + type Error = anyhow::Error; + + fn try_from(bytes: Vec) -> Result { + let ipld: Ipld = DagCborCodec.decode(&bytes)?; + ipld.try_into() + .map_err(|_| anyhow!("Could not convert IPLD to pubsub message.")) + } +} + +impl From> for Ipld +where + Ipld: From, +{ + fn from(message: Message) -> Self { + Ipld::Map(BTreeMap::from([ + (HEADER_KEY.into(), message.header.into()), + (PAYLOAD_KEY.into(), message.payload.into()), + ])) + } +} + +impl TryFrom for Message +where + T: TryFrom, +{ + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + + let header = map + .get(HEADER_KEY) + .ok_or_else(|| anyhow!("missing {HEADER_KEY}"))? + .to_owned() + .try_into()?; + + let payload = map + .get(PAYLOAD_KEY) + .ok_or_else(|| anyhow!("missing {PAYLOAD_KEY}"))? + .to_owned() + .try_into()?; + + Ok(Message { header, payload }) + } +} + +#[derive(Clone, Debug)] +pub(crate) struct Header { + nonce: Nonce, +} + +impl From
for Ipld { + fn from(header: Header) -> Self { + Ipld::Map(BTreeMap::from([( + NONCE_KEY.into(), + header.nonce.to_owned().into(), + )])) + } +} + +impl TryFrom for Header { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + + let nonce = map + .get(NONCE_KEY) + .ok_or_else(|| anyhow!("Missing {NONCE_KEY}"))? + .try_into()?; + + Ok(Header { nonce }) + } +} + +impl TryFrom
for Vec { + type Error = anyhow::Error; + + fn try_from(header: Header) -> Result { + let header_ipld = Ipld::from(header); + DagCborCodec.encode(&header_ipld) + } +} + +impl TryFrom> for Header { + type Error = anyhow::Error; + + fn try_from(bytes: Vec) -> Result { + let ipld: Ipld = DagCborCodec.decode(&bytes)?; + ipld.try_into() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::{test_utils, Receipt}; + + #[test] + fn pubsub_message_rountrip() { + let (_, receipt) = test_utils::receipt::receipts(); + let message = Message::new(receipt.clone()); + let bytes: Vec = message + .try_into() + .expect("Could not serialize message into bytes"); + + let parsed = + Message::::try_from(bytes).expect("Could not deserialize message from bytes"); + + assert_eq!(receipt, parsed.payload); + } +} diff --git a/homestar-runtime/src/network/swarm.rs b/homestar-runtime/src/network/swarm.rs index ff4f2392..642939a5 100644 --- a/homestar-runtime/src/network/swarm.rs +++ b/homestar-runtime/src/network/swarm.rs @@ -270,7 +270,7 @@ pub(crate) enum ComposedEvent { #[derive(Debug)] pub(crate) enum TopicMessage { /// Receipt topic, wrapping [Receipt]. - CapturedReceipt(Receipt), + CapturedReceipt(pubsub::Message), } /// Custom behaviours for [Swarm]. @@ -316,8 +316,9 @@ impl ComposedBehaviour { if let Some(gossipsub) = self.gossipsub.as_mut() { let id_topic = gossipsub::IdentTopic::new(topic); // Make this a match once we have other topics. - let TopicMessage::CapturedReceipt(receipt) = msg; - let msg_bytes: Vec = receipt.try_into()?; + let TopicMessage::CapturedReceipt(message) = msg; + let msg_bytes: Vec = message.try_into()?; + if gossipsub .mesh_peers(&TopicHash::from_raw(topic)) .peekable()