From b1a0bbf26fbbd3e8df73de11463c1fadce5a6ead Mon Sep 17 00:00:00 2001 From: Brian Ginsburg <7957636+bgins@users.noreply.github.com> Date: Tue, 7 Nov 2023 11:54:28 -0800 Subject: [PATCH] feat: Add event notifications (#410) # Description This pull request implements the following changes: - [x] Add event notifications - [x] Add `emit_event` notification utility function - [x] Add `emit_receipt` notification utility function - [x] Move receipt notifications to event handler notifications - [x] Add `ConnnectionEstablished`, `ConnnectionClosed`, `ListeningOn`, `OutgoingConnectionError`, and `IncomingConnectionError` network notifications - [x] Test JSON event notification bytes roundtrip - [x] Test JSON event notification string roundtrip - [x] Integration test connection notifications with two Homestar nodes ## Link to issue Closes #407 ## Type of change - [x] New feature (non-breaking change that adds functionality) ## Test plan (required) We have included unit tests to check roundtrip conversions between JSON bytes and strings. In addition, we have included an integration test that subscribes and listens for connection messages between Homestar nodes. --------- Co-authored-by: Zeeshan Lakhani --- homestar-runtime/Cargo.toml | 2 +- homestar-runtime/src/event_handler.rs | 1 + homestar-runtime/src/event_handler/event.rs | 70 +---- .../src/event_handler/notification.rs | 242 ++++++++++++++++++ .../src/event_handler/notification/receipt.rs | 44 ++++ .../src/event_handler/notification/swarm.rs | 44 ++++ .../src/event_handler/swarm_event.rs | 72 +++++- homestar-runtime/src/network/webserver.rs | 11 +- .../src/network/webserver/notifier.rs | 45 +--- .../tests/fixtures/test_notification1.toml | 20 ++ .../tests/fixtures/test_notification2.toml | 20 ++ homestar-runtime/tests/network.rs | 6 +- .../tests/network/notification.rs | 127 +++++++++ 13 files changed, 591 insertions(+), 113 deletions(-) create mode 100644 homestar-runtime/src/event_handler/notification.rs create mode 100644 homestar-runtime/src/event_handler/notification/receipt.rs create mode 100644 homestar-runtime/src/event_handler/notification/swarm.rs create mode 100644 homestar-runtime/tests/fixtures/test_notification1.toml create mode 100644 homestar-runtime/tests/fixtures/test_notification2.toml create mode 100644 homestar-runtime/tests/network/notification.rs diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 83f0b90bb..9d146f3f0 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -106,6 +106,7 @@ libp2p = { version = "0.52", default-features = false, features = [ libsqlite3-sys = { version = "0.26", default-features = false, features = [ "bundled", ] } +maplit = "1.0" metrics = { version = "0.21", default-features = false } metrics-exporter-prometheus = { version = "0.12.1", default-features = false, features = [ "http-listener", @@ -194,7 +195,6 @@ homestar_runtime_proc_macro = { path = "src/test_utils/proc_macro", package = "h jsonrpsee = { version = "0.20", default-features = false, features = [ "client", ] } -maplit = "1.0" nix = { version = "0.27", features = ["signal"] } predicates = { version = "3.0", default-features = false } prometheus-parse = "0.2.4" diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 02db93c19..269bc736e 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -25,6 +25,7 @@ pub(crate) mod cache; pub mod channel; pub(crate) mod error; pub(crate) mod event; +pub(crate) mod notification; pub(crate) mod swarm_event; pub(crate) use cache::{setup_cache, CacheValue}; pub(crate) use error::RequestResponseError; diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index 368f03250..7c222cf78 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -2,7 +2,7 @@ use super::EventHandler; #[cfg(feature = "websocket-notify")] -use crate::network::webserver::notifier::{Header, Message, NotifyReceipt, SubscriptionTyp}; +use crate::event_handler::notification::emit_receipt; #[cfg(feature = "ipfs")] use crate::network::IpfsCli; use crate::{ @@ -16,15 +16,9 @@ use crate::{ }; use anyhow::Result; use async_trait::async_trait; -use homestar_core::workflow::Receipt as InvocationReceipt; #[cfg(feature = "websocket-notify")] -use homestar_core::{ - ipld::DagJson, - workflow::{ - receipt::metadata::{WORKFLOW_KEY, WORKFLOW_NAME_KEY}, - Pointer, - }, -}; +use homestar_core::workflow::Pointer; +use homestar_core::workflow::Receipt as InvocationReceipt; use libipld::{Cid, Ipld}; use libp2p::{ kad::{record::Key, Quorum, Record}, @@ -251,23 +245,11 @@ impl Captured { #[cfg(feature = "websocket-notify")] { - let invocation_notification = invocation_receipt.clone(); - let ws_tx = event_handler.ws_workflow_sender(); - let metadata = self.metadata.to_owned(); - let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata); - if let Ok(json) = receipt.to_json() { - info!( - cid = receipt_cid.to_string(), - "Sending receipt to websocket" - ); - let _ = ws_tx.notify(Message::new( - Header::new( - SubscriptionTyp::Cid(self.workflow.cid), - self.workflow.name.clone(), - ), - json, - )); - } + emit_receipt( + event_handler.ws_workflow_sender(), + receipt.clone(), + self.metadata.to_owned(), + ) } if event_handler.pubsub_enabled { @@ -367,37 +349,13 @@ impl Replay { self.pointers.iter().collect::>() ); + #[cfg(feature = "websocket-notify")] receipts.into_iter().for_each(|receipt| { - let invocation_receipt = InvocationReceipt::from(&receipt); - let invocation_notification = invocation_receipt; - let receipt_cid = receipt.cid(); - - let ws_tx = event_handler.ws_workflow_sender(); - let metadata = self.metadata.to_owned(); - let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata); - if let Ok(json) = receipt.to_json() { - info!( - cid = receipt_cid.to_string(), - "Sending receipt to websocket" - ); - - if let Some(ipld) = &self.metadata { - match (ipld.get(WORKFLOW_KEY), ipld.get(WORKFLOW_NAME_KEY)) { - (Ok(Ipld::Link(cid)), Ok(Ipld::String(name))) => { - let header = Header::new( - SubscriptionTyp::Cid(*cid), - Some((name.to_string()).into()), - ); - let _ = ws_tx.notify(Message::new(header, json)); - } - (Ok(Ipld::Link(cid)), Err(_err)) => { - let header = Header::new(SubscriptionTyp::Cid(*cid), None); - let _ = ws_tx.notify(Message::new(header, json)); - } - _ => (), - } - } - } + emit_receipt( + event_handler.ws_workflow_sender(), + receipt, + self.metadata.to_owned(), + ); }); Ok(()) diff --git a/homestar-runtime/src/event_handler/notification.rs b/homestar-runtime/src/event_handler/notification.rs new file mode 100644 index 000000000..8daa09a34 --- /dev/null +++ b/homestar-runtime/src/event_handler/notification.rs @@ -0,0 +1,242 @@ +use crate::{ + network::webserver::{ + notifier::{self, Header, Message, Notifier, SubscriptionTyp}, + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + }, + Receipt, +}; +use anyhow::anyhow; +use chrono::prelude::Utc; +use homestar_core::{ + ipld::DagJson, + workflow::{ + receipt::metadata::{WORKFLOW_KEY, WORKFLOW_NAME_KEY}, + Receipt as InvocationReceipt, + }, +}; +use libipld::{serde::from_ipld, Ipld}; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeMap, str::FromStr}; +use tracing::{info, warn}; + +pub(crate) mod receipt; +pub(crate) mod swarm; +pub(crate) use receipt::ReceiptNotification; +pub(crate) use swarm::SwarmNotification; + +const TYPE_KEY: &str = "type"; +const DATA_KEY: &str = "data"; +const TIMESTAMP_KEY: &str = "timestamp"; + +/// Send receipt notification as bytes. +pub(crate) fn emit_receipt( + notifier: Notifier, + receipt: Receipt, + metadata: Option, +) { + let invocation_receipt = InvocationReceipt::from(&receipt); + let receipt_cid = receipt.cid(); + let notification = ReceiptNotification::with(invocation_receipt, receipt_cid, metadata.clone()); + + if let Ok(json) = notification.to_json() { + info!( + cid = receipt_cid.to_string(), + "Sending receipt to websocket" + ); + if let Some(ipld) = metadata { + match (ipld.get(WORKFLOW_KEY), ipld.get(WORKFLOW_NAME_KEY)) { + (Ok(Ipld::Link(cid)), Ok(Ipld::String(name))) => { + let header = + Header::new(SubscriptionTyp::Cid(*cid), Some((name.to_string()).into())); + let _ = notifier.notify(Message::new(header, json)); + } + (Ok(Ipld::Link(cid)), Err(_err)) => { + let header = Header::new(SubscriptionTyp::Cid(*cid), None); + let _ = notifier.notify(Message::new(header, json)); + } + _ => (), + } + } + } else { + warn!("Unable to serialize receipt as bytes: {receipt:?}"); + } +} + +/// Send event notification as bytes. +pub(crate) fn emit_event( + notifier: Notifier, + ty: EventNotificationTyp, + data: BTreeMap<&str, String>, +) { + let header = Header::new( + SubscriptionTyp::EventSub(SUBSCRIBE_NETWORK_EVENTS_ENDPOINT.to_string()), + None, + ); + let notification = EventNotification::new(ty, data); + + if let Ok(json) = notification.to_json() { + let _ = notifier.notify(Message::new(header, json)); + } else { + warn!("Unable to serialize notification as bytes: {notification:?}"); + } +} + +/// Notification sent to clients. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) struct EventNotification { + typ: EventNotificationTyp, + data: Ipld, + timestamp: i64, +} + +impl EventNotification { + pub(crate) fn new(typ: EventNotificationTyp, data: BTreeMap<&str, String>) -> Self { + let ipld_data = data + .iter() + .map(|(key, val)| (key.to_string(), Ipld::String(val.to_owned()))) + .collect(); + + Self { + typ, + data: Ipld::Map(ipld_data), + timestamp: Utc::now().timestamp_millis(), + } + } +} + +impl DagJson for EventNotification where Ipld: From {} + +impl From for Ipld { + fn from(notification: EventNotification) -> Self { + Ipld::Map(BTreeMap::from([ + ("type".into(), notification.typ.into()), + ("data".into(), notification.data), + ("timestamp".into(), notification.timestamp.into()), + ])) + } +} + +impl TryFrom for EventNotification { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + + let typ: EventNotificationTyp = map + .get(TYPE_KEY) + .ok_or_else(|| anyhow!("missing {TYPE_KEY}"))? + .to_owned() + .try_into()?; + + let data = map + .get(DATA_KEY) + .ok_or_else(|| anyhow!("missing {DATA_KEY}"))? + .to_owned(); + + let timestamp = from_ipld( + map.get(TIMESTAMP_KEY) + .ok_or_else(|| anyhow!("missing {TIMESTAMP_KEY}"))? + .to_owned(), + )?; + + Ok(EventNotification { + typ, + data, + timestamp, + }) + } +} + +/// Types of notification sent to clients. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) enum EventNotificationTyp { + SwarmNotification(SwarmNotification), +} + +impl DagJson for EventNotificationTyp where Ipld: From {} + +impl From for Ipld { + fn from(typ: EventNotificationTyp) -> Self { + match typ { + EventNotificationTyp::SwarmNotification(subtype) => { + Ipld::String(format!("network:{}", subtype)) + } + } + } +} + +impl TryFrom for EventNotificationTyp { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + if let Some((ty, subtype)) = from_ipld::(ipld)?.split_once(':') { + match ty { + "network" => Ok(EventNotificationTyp::SwarmNotification( + SwarmNotification::from_str(subtype)?, + )), + _ => Err(anyhow!("Missing event notification type: {}", ty)), + } + } else { + Err(anyhow!( + "Event notification type missing colon delimiter between type and subtype." + )) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use libp2p::PeerId; + use maplit::btreemap; + + #[test] + fn notification_bytes_rountrip() { + let peer_id = PeerId::random().to_string(); + let address: String = "/ip4/127.0.0.1/tcp/7000".to_string(); + + let notification = EventNotification::new( + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished), + btreemap! { + "peer_id" => peer_id.clone(), + "address" => address.clone() + }, + ); + let bytes = notification.to_json().unwrap(); + + let parsed = EventNotification::from_json(bytes.as_ref()).unwrap(); + let data: BTreeMap = from_ipld(parsed.data).unwrap(); + + assert_eq!( + parsed.typ, + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished) + ); + assert_eq!(data.get("peer_id").unwrap(), &peer_id); + assert_eq!(data.get("address").unwrap(), &address); + } + + #[test] + fn notification_json_string_rountrip() { + let peer_id = PeerId::random().to_string(); + let address: String = "/ip4/127.0.0.1/tcp/7000".to_string(); + + let notification = EventNotification::new( + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished), + btreemap! { + "peer_id" => peer_id.clone(), + "address" => address.clone() + }, + ); + let json_string = notification.to_json_string().unwrap(); + + let parsed = EventNotification::from_json_string(json_string).unwrap(); + let data: BTreeMap = from_ipld(parsed.data).unwrap(); + + assert_eq!( + parsed.typ, + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished) + ); + assert_eq!(data.get("peer_id").unwrap(), &peer_id); + assert_eq!(data.get("address").unwrap(), &address); + } +} diff --git a/homestar-runtime/src/event_handler/notification/receipt.rs b/homestar-runtime/src/event_handler/notification/receipt.rs new file mode 100644 index 000000000..f2f222ec3 --- /dev/null +++ b/homestar-runtime/src/event_handler/notification/receipt.rs @@ -0,0 +1,44 @@ +use homestar_core::{ipld::DagJson, workflow::Receipt}; +use libipld::{ipld, Cid, Ipld}; + +/// A [Receipt] that is sent out *just* for websocket notifications. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ReceiptNotification(Ipld); + +impl ReceiptNotification { + /// TODO + #[allow(dead_code)] + pub(crate) fn inner(&self) -> &Ipld { + &self.0 + } + + /// TODO + #[allow(dead_code)] + pub(crate) fn into_inner(self) -> Ipld { + self.0.to_owned() + } + + pub(crate) fn with(receipt: Receipt, cid: Cid, metadata: Option) -> Self { + let receipt: Ipld = receipt.into(); + let data = ipld!({ + "receipt": receipt, + "metadata": metadata.as_ref().map(|m| m.to_owned()).map_or(Ipld::Null, |m| m), + "receipt_cid": cid, + }); + ReceiptNotification(data) + } +} + +impl DagJson for ReceiptNotification where Ipld: From {} + +impl From for Ipld { + fn from(receipt: ReceiptNotification) -> Self { + receipt.0 + } +} + +impl From for ReceiptNotification { + fn from(ipld: Ipld) -> Self { + ReceiptNotification(ipld) + } +} diff --git a/homestar-runtime/src/event_handler/notification/swarm.rs b/homestar-runtime/src/event_handler/notification/swarm.rs new file mode 100644 index 000000000..8e98947b1 --- /dev/null +++ b/homestar-runtime/src/event_handler/notification/swarm.rs @@ -0,0 +1,44 @@ +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; +use std::{fmt, str::FromStr}; + +// Swarm notification types sent to clients +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) enum SwarmNotification { + ConnnectionEstablished, + ConnnectionClosed, + ListeningOn, + OutgoingConnectionError, + IncomingConnectionError, +} + +impl fmt::Display for SwarmNotification { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SwarmNotification::ConnnectionEstablished => write!(f, "connectionEstablished"), + SwarmNotification::ConnnectionClosed => write!(f, "connectionClosed"), + SwarmNotification::ListeningOn => write!(f, "listeningOn"), + SwarmNotification::OutgoingConnectionError => { + write!(f, "outgoingConnectionError") + } + SwarmNotification::IncomingConnectionError => { + write!(f, "incomingConnectionError") + } + } + } +} + +impl FromStr for SwarmNotification { + type Err = anyhow::Error; + + fn from_str(ty: &str) -> Result { + match ty { + "connectionEstablished" => Ok(Self::ConnnectionEstablished), + "connectionClosed" => Ok(Self::ConnnectionClosed), + "listeningOn" => Ok(Self::ListeningOn), + "outgoingConnectionError" => Ok(Self::OutgoingConnectionError), + "incomingConnectionError" => Ok(Self::IncomingConnectionError), + _ => Err(anyhow!("Missing swarm notification type: {}", ty)), + } + } +} diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 3bed8c4ac..2c22db6e0 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -8,6 +8,7 @@ use crate::{ event_handler::{ cache::{self, CacheData, CacheValue}, event::QueryRecord, + notification::{self, EventNotificationTyp, SwarmNotification}, Event, Handler, RequestResponseError, }, libp2p::multiaddr::MultiaddrExt, @@ -39,6 +40,7 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent}, PeerId, StreamProtocol, }; +use maplit::btreemap; use std::{ collections::{HashMap, HashSet}, fmt, @@ -711,9 +713,20 @@ async fn handle_swarm_event( } SwarmEvent::NewListenAddr { address, .. } => { let local_peer = *event_handler.swarm.local_peer_id(); + info!( - "local node is listening on {}", - address.with(Protocol::P2p(local_peer)) + peer_id = local_peer.to_string(), + "local node is listening on {}", address + ); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::ListeningOn), + btreemap! { + "peer_id" => local_peer.to_string(), + "address" => address.to_string() + }, ); } SwarmEvent::IncomingConnection { .. } => {} @@ -722,9 +735,27 @@ async fn handle_swarm_event( } => { debug!(peer_id=peer_id.to_string(), endpoint=?endpoint, "peer connection established"); // add peer to connected peers list - event_handler.connections.peers.insert(peer_id, endpoint); + event_handler + .connections + .peers + .insert(peer_id, endpoint.clone()); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished), + btreemap! { + "peer_id" => peer_id.to_string(), + "address" => endpoint.get_remote_address().to_string() + }, + ); } - SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { + SwarmEvent::ConnectionClosed { + peer_id, + cause, + endpoint, + .. + } => { debug!( peer_id = peer_id.to_string(), "peer connection closed, cause: {cause:?}" @@ -753,6 +784,16 @@ async fn handle_swarm_event( "removed peer from kademlia table" ); } + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionClosed), + btreemap! { + "peer_id" => peer_id.to_string(), + "address" => endpoint.get_remote_address().to_string() + }, + ); } SwarmEvent::OutgoingConnectionError { connection_id, @@ -764,7 +805,17 @@ async fn handle_swarm_event( err=?error, connection_id=?connection_id, "outgoing connection error" - ) + ); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::OutgoingConnectionError), + btreemap! { + "peer_id" => peer_id.map_or("Unknown peer".into(), |p| p.to_string()), + "error" => error.to_string() + }, + ); } SwarmEvent::IncomingConnectionError { connection_id, @@ -778,7 +829,16 @@ async fn handle_swarm_event( local_address=local_addr.to_string(), remote_address=send_back_addr.to_string(), "incoming connection error" - ) + ); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::IncomingConnectionError), + btreemap! { + "error" => error.to_string() + }, + ); } SwarmEvent::ListenerError { listener_id, error } => { error!(err=?error, listener_id=?listener_id, "listener error") diff --git a/homestar-runtime/src/network/webserver.rs b/homestar-runtime/src/network/webserver.rs index 738986c06..915739779 100644 --- a/homestar-runtime/src/network/webserver.rs +++ b/homestar-runtime/src/network/webserver.rs @@ -32,6 +32,7 @@ mod rpc; #[cfg(feature = "websocket-notify")] pub(crate) use notifier::Notifier; +pub(crate) use rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT; use rpc::{Context, JsonRpc}; /// Message type for messages sent back from the @@ -234,7 +235,7 @@ fn port_available(host: IpAddr, port: u16) -> bool { #[cfg(test)] mod test { use super::*; - use crate::settings::Settings; + use crate::{event_handler::notification::ReceiptNotification, settings::Settings}; use homestar_core::test_utils; #[cfg(feature = "websocket-notify")] use homestar_core::{ @@ -247,7 +248,7 @@ mod test { use jsonrpsee::types::error::ErrorCode; use jsonrpsee::{core::client::ClientT, rpc_params, ws_client::WsClientBuilder}; #[cfg(feature = "websocket-notify")] - use notifier::{self, Header, NotifyReceipt}; + use notifier::{self, Header}; use serial_test::file_serial; use tokio::sync::mpsc; @@ -361,7 +362,7 @@ mod test { // send any bytes through (Vec) let (invocation_receipt, runtime_receipt) = crate::test_utils::receipt::receipts(); - let receipt = NotifyReceipt::with(invocation_receipt, runtime_receipt.cid(), None); + let receipt = ReceiptNotification::with(invocation_receipt, runtime_receipt.cid(), None); server .evt_notifier .notify(notifier::Message::new( @@ -401,11 +402,11 @@ mod test { .unwrap(); let msg1 = sub.next().await.unwrap().unwrap(); - let returned1: NotifyReceipt = DagJson::from_json(&msg1).unwrap(); + let returned1: ReceiptNotification = DagJson::from_json(&msg1).unwrap(); assert_eq!(returned1, receipt); let msg2 = sub.next().await.unwrap().unwrap(); - let _returned1: NotifyReceipt = DagJson::from_json(&msg2).unwrap(); + let _returned1: ReceiptNotification = DagJson::from_json(&msg2).unwrap(); assert!(sub.unsubscribe().await.is_ok()); diff --git a/homestar-runtime/src/network/webserver/notifier.rs b/homestar-runtime/src/network/webserver/notifier.rs index 2630dd152..dc7abd6a4 100644 --- a/homestar-runtime/src/network/webserver/notifier.rs +++ b/homestar-runtime/src/network/webserver/notifier.rs @@ -2,8 +2,7 @@ use anyhow::Result; use faststr::FastStr; -use homestar_core::{ipld::DagJson, workflow::Receipt}; -use libipld::{ipld, Cid, Ipld}; +use libipld::Cid; use std::{fmt, sync::Arc}; use tokio::sync::broadcast; @@ -95,45 +94,3 @@ impl Message { &self.payload } } - -/// A [Receipt] that is sent out *just* for websocket notifications. -#[derive(Debug, Clone, PartialEq)] -pub(crate) struct NotifyReceipt(Ipld); - -impl NotifyReceipt { - /// TODO - #[allow(dead_code)] - pub(crate) fn inner(&self) -> &Ipld { - &self.0 - } - - /// TODO - #[allow(dead_code)] - pub(crate) fn into_inner(self) -> Ipld { - self.0.to_owned() - } - - pub(crate) fn with(receipt: Receipt, cid: Cid, metadata: Option) -> Self { - let receipt: Ipld = receipt.into(); - let data = ipld!({ - "receipt": receipt, - "metadata": metadata.as_ref().map(|m| m.to_owned()).map_or(Ipld::Null, |m| m), - "receipt_cid": cid, - }); - NotifyReceipt(data) - } -} - -impl DagJson for NotifyReceipt where Ipld: From {} - -impl From for Ipld { - fn from(receipt: NotifyReceipt) -> Self { - receipt.0 - } -} - -impl From for NotifyReceipt { - fn from(ipld: Ipld) -> Self { - NotifyReceipt(ipld) - } -} diff --git a/homestar-runtime/tests/fixtures/test_notification1.toml b/homestar-runtime/tests/fixtures/test_notification1.toml new file mode 100644 index 000000000..979515fd5 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_notification1.toml @@ -0,0 +1,20 @@ +[monitoring] +process_collector_interval = 500 +console_subscriber_port = 5582 + +[node] + +[node.network] +metrics_port = 4032 +rpc_port = 9822 +webserver_port = 8022 +listen_address = "/ip4/127.0.0.1/tcp/7010" +node_addresses = [ + "/ip4/127.0.0.1/tcp/7011/p2p/16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", +] +enable_mdns = false +enable_rendezvous_client = false + +# Peer ID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/test_notification2.toml b/homestar-runtime/tests/fixtures/test_notification2.toml new file mode 100644 index 000000000..dd22b7a11 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_notification2.toml @@ -0,0 +1,20 @@ +[monitoring] +process_collector_interval = 500 +console_subscriber_port = 5583 + +[node] + +[node.network] +metrics_port = 4033 +rpc_port = 9823 +webserver_port = 8023 +listen_address = "/ip4/127.0.0.1/tcp/7011" +node_addresses = [ + "/ip4/127.0.0.1/tcp/7010/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +] +enable_mdns = false +enable_rendezvous_client = false + +# Peer ID 16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc +[node.network.keypair_config] +existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" } diff --git a/homestar-runtime/tests/network.rs b/homestar-runtime/tests/network.rs index 7bfd507d9..0884b91c3 100644 --- a/homestar-runtime/tests/network.rs +++ b/homestar-runtime/tests/network.rs @@ -11,6 +11,9 @@ use std::{ time::Duration, }; +#[cfg(feature = "websocket-notify")] +mod notification; + #[allow(dead_code)] static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME)); @@ -65,7 +68,8 @@ fn test_libp2p_listens_on_address_serial() -> Result<()> { stdout, vec![ "local node is listening", - "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + "/ip4/127.0.0.1/tcp/7000", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", ], ); diff --git a/homestar-runtime/tests/network/notification.rs b/homestar-runtime/tests/network/notification.rs new file mode 100644 index 000000000..ea2d9b198 --- /dev/null +++ b/homestar-runtime/tests/network/notification.rs @@ -0,0 +1,127 @@ +use crate::utils::{kill_homestar, stop_homestar, BIN_NAME}; +use anyhow::Result; +use jsonrpsee::{ + core::client::{Subscription, SubscriptionClientT}, + rpc_params, + ws_client::WsClientBuilder, +}; +use once_cell::sync::Lazy; +use retry::{delay::Exponential, retry}; +use serial_test::file_serial; +use std::{ + net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}, + path::PathBuf, + process::{Command, Stdio}, + time::Duration, +}; + +static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME)); +const SUBSCRIBE_NETWORK_EVENTS_ENDPOINT: &str = "subscribe_network_events"; +const UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT: &str = "unsubscribe_network_events"; + +#[test] +#[file_serial] +fn test_connection_notifications_serial() -> Result<()> { + let _ = stop_homestar(); + + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_notification1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let ws_port = 8022; + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ws_port); + let result = retry(Exponential::from_millis(1000).take(10), || { + TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) + }); + + if result.is_err() { + let _ = kill_homestar(homestar_proc1, None); + panic!("Homestar server/runtime failed to start in time"); + } + + let ws_url = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port); + tokio_test::block_on(async { + tokio_tungstenite::connect_async(ws_url.clone()) + .await + .unwrap(); + + let client = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); + let mut sub: Subscription> = client + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(200)).await; + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_notification2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let _ = kill_homestar(homestar_proc2, None); + + tokio::time::sleep(Duration::from_secs(2)).await; + + { + let msg = sub + .next() + .await + .expect("Subscription did not receive a connection established message"); + let json: serde_json::Value = serde_json::from_slice(&msg.unwrap()).unwrap(); + let typ = json["type"].as_str().unwrap(); + let peer_id = json["data"]["peer_id"].as_str().unwrap(); + + assert_eq!(typ, "network:connectionEstablished"); + assert_eq!( + peer_id, + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc" + ); + } + + { + let msg = sub + .next() + .await + .expect("Subscription did not receive a connection closed message"); + let json: serde_json::Value = serde_json::from_slice(&msg.unwrap()).unwrap(); + let typ = json["type"].as_str().unwrap(); + let peer_id = json["data"]["peer_id"].as_str().unwrap(); + + assert_eq!(typ, "network:connectionClosed"); + assert_eq!( + peer_id, + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc" + ); + } + }); + + let _ = kill_homestar(homestar_proc1, None); + + Ok(()) +}