diff --git a/homestar-runtime/Cargo.toml b/homestar-runtime/Cargo.toml index 83f0b90bb..9d146f3f0 100644 --- a/homestar-runtime/Cargo.toml +++ b/homestar-runtime/Cargo.toml @@ -106,6 +106,7 @@ libp2p = { version = "0.52", default-features = false, features = [ libsqlite3-sys = { version = "0.26", default-features = false, features = [ "bundled", ] } +maplit = "1.0" metrics = { version = "0.21", default-features = false } metrics-exporter-prometheus = { version = "0.12.1", default-features = false, features = [ "http-listener", @@ -194,7 +195,6 @@ homestar_runtime_proc_macro = { path = "src/test_utils/proc_macro", package = "h jsonrpsee = { version = "0.20", default-features = false, features = [ "client", ] } -maplit = "1.0" nix = { version = "0.27", features = ["signal"] } predicates = { version = "3.0", default-features = false } prometheus-parse = "0.2.4" diff --git a/homestar-runtime/src/event_handler.rs b/homestar-runtime/src/event_handler.rs index 02db93c19..269bc736e 100644 --- a/homestar-runtime/src/event_handler.rs +++ b/homestar-runtime/src/event_handler.rs @@ -25,6 +25,7 @@ pub(crate) mod cache; pub mod channel; pub(crate) mod error; pub(crate) mod event; +pub(crate) mod notification; pub(crate) mod swarm_event; pub(crate) use cache::{setup_cache, CacheValue}; pub(crate) use error::RequestResponseError; diff --git a/homestar-runtime/src/event_handler/event.rs b/homestar-runtime/src/event_handler/event.rs index 368f03250..7c222cf78 100644 --- a/homestar-runtime/src/event_handler/event.rs +++ b/homestar-runtime/src/event_handler/event.rs @@ -2,7 +2,7 @@ use super::EventHandler; #[cfg(feature = "websocket-notify")] -use crate::network::webserver::notifier::{Header, Message, NotifyReceipt, SubscriptionTyp}; +use crate::event_handler::notification::emit_receipt; #[cfg(feature = "ipfs")] use crate::network::IpfsCli; use crate::{ @@ -16,15 +16,9 @@ use crate::{ }; use anyhow::Result; use async_trait::async_trait; -use homestar_core::workflow::Receipt as InvocationReceipt; #[cfg(feature = "websocket-notify")] -use homestar_core::{ - ipld::DagJson, - workflow::{ - receipt::metadata::{WORKFLOW_KEY, WORKFLOW_NAME_KEY}, - Pointer, - }, -}; +use homestar_core::workflow::Pointer; +use homestar_core::workflow::Receipt as InvocationReceipt; use libipld::{Cid, Ipld}; use libp2p::{ kad::{record::Key, Quorum, Record}, @@ -251,23 +245,11 @@ impl Captured { #[cfg(feature = "websocket-notify")] { - let invocation_notification = invocation_receipt.clone(); - let ws_tx = event_handler.ws_workflow_sender(); - let metadata = self.metadata.to_owned(); - let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata); - if let Ok(json) = receipt.to_json() { - info!( - cid = receipt_cid.to_string(), - "Sending receipt to websocket" - ); - let _ = ws_tx.notify(Message::new( - Header::new( - SubscriptionTyp::Cid(self.workflow.cid), - self.workflow.name.clone(), - ), - json, - )); - } + emit_receipt( + event_handler.ws_workflow_sender(), + receipt.clone(), + self.metadata.to_owned(), + ) } if event_handler.pubsub_enabled { @@ -367,37 +349,13 @@ impl Replay { self.pointers.iter().collect::>() ); + #[cfg(feature = "websocket-notify")] receipts.into_iter().for_each(|receipt| { - let invocation_receipt = InvocationReceipt::from(&receipt); - let invocation_notification = invocation_receipt; - let receipt_cid = receipt.cid(); - - let ws_tx = event_handler.ws_workflow_sender(); - let metadata = self.metadata.to_owned(); - let receipt = NotifyReceipt::with(invocation_notification, receipt_cid, metadata); - if let Ok(json) = receipt.to_json() { - info!( - cid = receipt_cid.to_string(), - "Sending receipt to websocket" - ); - - if let Some(ipld) = &self.metadata { - match (ipld.get(WORKFLOW_KEY), ipld.get(WORKFLOW_NAME_KEY)) { - (Ok(Ipld::Link(cid)), Ok(Ipld::String(name))) => { - let header = Header::new( - SubscriptionTyp::Cid(*cid), - Some((name.to_string()).into()), - ); - let _ = ws_tx.notify(Message::new(header, json)); - } - (Ok(Ipld::Link(cid)), Err(_err)) => { - let header = Header::new(SubscriptionTyp::Cid(*cid), None); - let _ = ws_tx.notify(Message::new(header, json)); - } - _ => (), - } - } - } + emit_receipt( + event_handler.ws_workflow_sender(), + receipt, + self.metadata.to_owned(), + ); }); Ok(()) diff --git a/homestar-runtime/src/event_handler/notification.rs b/homestar-runtime/src/event_handler/notification.rs new file mode 100644 index 000000000..8daa09a34 --- /dev/null +++ b/homestar-runtime/src/event_handler/notification.rs @@ -0,0 +1,242 @@ +use crate::{ + network::webserver::{ + notifier::{self, Header, Message, Notifier, SubscriptionTyp}, + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + }, + Receipt, +}; +use anyhow::anyhow; +use chrono::prelude::Utc; +use homestar_core::{ + ipld::DagJson, + workflow::{ + receipt::metadata::{WORKFLOW_KEY, WORKFLOW_NAME_KEY}, + Receipt as InvocationReceipt, + }, +}; +use libipld::{serde::from_ipld, Ipld}; +use serde::{Deserialize, Serialize}; +use std::{collections::BTreeMap, str::FromStr}; +use tracing::{info, warn}; + +pub(crate) mod receipt; +pub(crate) mod swarm; +pub(crate) use receipt::ReceiptNotification; +pub(crate) use swarm::SwarmNotification; + +const TYPE_KEY: &str = "type"; +const DATA_KEY: &str = "data"; +const TIMESTAMP_KEY: &str = "timestamp"; + +/// Send receipt notification as bytes. +pub(crate) fn emit_receipt( + notifier: Notifier, + receipt: Receipt, + metadata: Option, +) { + let invocation_receipt = InvocationReceipt::from(&receipt); + let receipt_cid = receipt.cid(); + let notification = ReceiptNotification::with(invocation_receipt, receipt_cid, metadata.clone()); + + if let Ok(json) = notification.to_json() { + info!( + cid = receipt_cid.to_string(), + "Sending receipt to websocket" + ); + if let Some(ipld) = metadata { + match (ipld.get(WORKFLOW_KEY), ipld.get(WORKFLOW_NAME_KEY)) { + (Ok(Ipld::Link(cid)), Ok(Ipld::String(name))) => { + let header = + Header::new(SubscriptionTyp::Cid(*cid), Some((name.to_string()).into())); + let _ = notifier.notify(Message::new(header, json)); + } + (Ok(Ipld::Link(cid)), Err(_err)) => { + let header = Header::new(SubscriptionTyp::Cid(*cid), None); + let _ = notifier.notify(Message::new(header, json)); + } + _ => (), + } + } + } else { + warn!("Unable to serialize receipt as bytes: {receipt:?}"); + } +} + +/// Send event notification as bytes. +pub(crate) fn emit_event( + notifier: Notifier, + ty: EventNotificationTyp, + data: BTreeMap<&str, String>, +) { + let header = Header::new( + SubscriptionTyp::EventSub(SUBSCRIBE_NETWORK_EVENTS_ENDPOINT.to_string()), + None, + ); + let notification = EventNotification::new(ty, data); + + if let Ok(json) = notification.to_json() { + let _ = notifier.notify(Message::new(header, json)); + } else { + warn!("Unable to serialize notification as bytes: {notification:?}"); + } +} + +/// Notification sent to clients. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) struct EventNotification { + typ: EventNotificationTyp, + data: Ipld, + timestamp: i64, +} + +impl EventNotification { + pub(crate) fn new(typ: EventNotificationTyp, data: BTreeMap<&str, String>) -> Self { + let ipld_data = data + .iter() + .map(|(key, val)| (key.to_string(), Ipld::String(val.to_owned()))) + .collect(); + + Self { + typ, + data: Ipld::Map(ipld_data), + timestamp: Utc::now().timestamp_millis(), + } + } +} + +impl DagJson for EventNotification where Ipld: From {} + +impl From for Ipld { + fn from(notification: EventNotification) -> Self { + Ipld::Map(BTreeMap::from([ + ("type".into(), notification.typ.into()), + ("data".into(), notification.data), + ("timestamp".into(), notification.timestamp.into()), + ])) + } +} + +impl TryFrom for EventNotification { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + let map = from_ipld::>(ipld)?; + + let typ: EventNotificationTyp = map + .get(TYPE_KEY) + .ok_or_else(|| anyhow!("missing {TYPE_KEY}"))? + .to_owned() + .try_into()?; + + let data = map + .get(DATA_KEY) + .ok_or_else(|| anyhow!("missing {DATA_KEY}"))? + .to_owned(); + + let timestamp = from_ipld( + map.get(TIMESTAMP_KEY) + .ok_or_else(|| anyhow!("missing {TIMESTAMP_KEY}"))? + .to_owned(), + )?; + + Ok(EventNotification { + typ, + data, + timestamp, + }) + } +} + +/// Types of notification sent to clients. +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) enum EventNotificationTyp { + SwarmNotification(SwarmNotification), +} + +impl DagJson for EventNotificationTyp where Ipld: From {} + +impl From for Ipld { + fn from(typ: EventNotificationTyp) -> Self { + match typ { + EventNotificationTyp::SwarmNotification(subtype) => { + Ipld::String(format!("network:{}", subtype)) + } + } + } +} + +impl TryFrom for EventNotificationTyp { + type Error = anyhow::Error; + + fn try_from(ipld: Ipld) -> Result { + if let Some((ty, subtype)) = from_ipld::(ipld)?.split_once(':') { + match ty { + "network" => Ok(EventNotificationTyp::SwarmNotification( + SwarmNotification::from_str(subtype)?, + )), + _ => Err(anyhow!("Missing event notification type: {}", ty)), + } + } else { + Err(anyhow!( + "Event notification type missing colon delimiter between type and subtype." + )) + } + } +} + +#[cfg(test)] +mod test { + use super::*; + use libp2p::PeerId; + use maplit::btreemap; + + #[test] + fn notification_bytes_rountrip() { + let peer_id = PeerId::random().to_string(); + let address: String = "/ip4/127.0.0.1/tcp/7000".to_string(); + + let notification = EventNotification::new( + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished), + btreemap! { + "peer_id" => peer_id.clone(), + "address" => address.clone() + }, + ); + let bytes = notification.to_json().unwrap(); + + let parsed = EventNotification::from_json(bytes.as_ref()).unwrap(); + let data: BTreeMap = from_ipld(parsed.data).unwrap(); + + assert_eq!( + parsed.typ, + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished) + ); + assert_eq!(data.get("peer_id").unwrap(), &peer_id); + assert_eq!(data.get("address").unwrap(), &address); + } + + #[test] + fn notification_json_string_rountrip() { + let peer_id = PeerId::random().to_string(); + let address: String = "/ip4/127.0.0.1/tcp/7000".to_string(); + + let notification = EventNotification::new( + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished), + btreemap! { + "peer_id" => peer_id.clone(), + "address" => address.clone() + }, + ); + let json_string = notification.to_json_string().unwrap(); + + let parsed = EventNotification::from_json_string(json_string).unwrap(); + let data: BTreeMap = from_ipld(parsed.data).unwrap(); + + assert_eq!( + parsed.typ, + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished) + ); + assert_eq!(data.get("peer_id").unwrap(), &peer_id); + assert_eq!(data.get("address").unwrap(), &address); + } +} diff --git a/homestar-runtime/src/event_handler/notification/receipt.rs b/homestar-runtime/src/event_handler/notification/receipt.rs new file mode 100644 index 000000000..f2f222ec3 --- /dev/null +++ b/homestar-runtime/src/event_handler/notification/receipt.rs @@ -0,0 +1,44 @@ +use homestar_core::{ipld::DagJson, workflow::Receipt}; +use libipld::{ipld, Cid, Ipld}; + +/// A [Receipt] that is sent out *just* for websocket notifications. +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct ReceiptNotification(Ipld); + +impl ReceiptNotification { + /// TODO + #[allow(dead_code)] + pub(crate) fn inner(&self) -> &Ipld { + &self.0 + } + + /// TODO + #[allow(dead_code)] + pub(crate) fn into_inner(self) -> Ipld { + self.0.to_owned() + } + + pub(crate) fn with(receipt: Receipt, cid: Cid, metadata: Option) -> Self { + let receipt: Ipld = receipt.into(); + let data = ipld!({ + "receipt": receipt, + "metadata": metadata.as_ref().map(|m| m.to_owned()).map_or(Ipld::Null, |m| m), + "receipt_cid": cid, + }); + ReceiptNotification(data) + } +} + +impl DagJson for ReceiptNotification where Ipld: From {} + +impl From for Ipld { + fn from(receipt: ReceiptNotification) -> Self { + receipt.0 + } +} + +impl From for ReceiptNotification { + fn from(ipld: Ipld) -> Self { + ReceiptNotification(ipld) + } +} diff --git a/homestar-runtime/src/event_handler/notification/swarm.rs b/homestar-runtime/src/event_handler/notification/swarm.rs new file mode 100644 index 000000000..8e98947b1 --- /dev/null +++ b/homestar-runtime/src/event_handler/notification/swarm.rs @@ -0,0 +1,44 @@ +use anyhow::anyhow; +use serde::{Deserialize, Serialize}; +use std::{fmt, str::FromStr}; + +// Swarm notification types sent to clients +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub(crate) enum SwarmNotification { + ConnnectionEstablished, + ConnnectionClosed, + ListeningOn, + OutgoingConnectionError, + IncomingConnectionError, +} + +impl fmt::Display for SwarmNotification { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match *self { + SwarmNotification::ConnnectionEstablished => write!(f, "connectionEstablished"), + SwarmNotification::ConnnectionClosed => write!(f, "connectionClosed"), + SwarmNotification::ListeningOn => write!(f, "listeningOn"), + SwarmNotification::OutgoingConnectionError => { + write!(f, "outgoingConnectionError") + } + SwarmNotification::IncomingConnectionError => { + write!(f, "incomingConnectionError") + } + } + } +} + +impl FromStr for SwarmNotification { + type Err = anyhow::Error; + + fn from_str(ty: &str) -> Result { + match ty { + "connectionEstablished" => Ok(Self::ConnnectionEstablished), + "connectionClosed" => Ok(Self::ConnnectionClosed), + "listeningOn" => Ok(Self::ListeningOn), + "outgoingConnectionError" => Ok(Self::OutgoingConnectionError), + "incomingConnectionError" => Ok(Self::IncomingConnectionError), + _ => Err(anyhow!("Missing swarm notification type: {}", ty)), + } + } +} diff --git a/homestar-runtime/src/event_handler/swarm_event.rs b/homestar-runtime/src/event_handler/swarm_event.rs index 3bed8c4ac..2c22db6e0 100644 --- a/homestar-runtime/src/event_handler/swarm_event.rs +++ b/homestar-runtime/src/event_handler/swarm_event.rs @@ -8,6 +8,7 @@ use crate::{ event_handler::{ cache::{self, CacheData, CacheValue}, event::QueryRecord, + notification::{self, EventNotificationTyp, SwarmNotification}, Event, Handler, RequestResponseError, }, libp2p::multiaddr::MultiaddrExt, @@ -39,6 +40,7 @@ use libp2p::{ swarm::{dial_opts::DialOpts, SwarmEvent}, PeerId, StreamProtocol, }; +use maplit::btreemap; use std::{ collections::{HashMap, HashSet}, fmt, @@ -711,9 +713,20 @@ async fn handle_swarm_event( } SwarmEvent::NewListenAddr { address, .. } => { let local_peer = *event_handler.swarm.local_peer_id(); + info!( - "local node is listening on {}", - address.with(Protocol::P2p(local_peer)) + peer_id = local_peer.to_string(), + "local node is listening on {}", address + ); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::ListeningOn), + btreemap! { + "peer_id" => local_peer.to_string(), + "address" => address.to_string() + }, ); } SwarmEvent::IncomingConnection { .. } => {} @@ -722,9 +735,27 @@ async fn handle_swarm_event( } => { debug!(peer_id=peer_id.to_string(), endpoint=?endpoint, "peer connection established"); // add peer to connected peers list - event_handler.connections.peers.insert(peer_id, endpoint); + event_handler + .connections + .peers + .insert(peer_id, endpoint.clone()); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionEstablished), + btreemap! { + "peer_id" => peer_id.to_string(), + "address" => endpoint.get_remote_address().to_string() + }, + ); } - SwarmEvent::ConnectionClosed { peer_id, cause, .. } => { + SwarmEvent::ConnectionClosed { + peer_id, + cause, + endpoint, + .. + } => { debug!( peer_id = peer_id.to_string(), "peer connection closed, cause: {cause:?}" @@ -753,6 +784,16 @@ async fn handle_swarm_event( "removed peer from kademlia table" ); } + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::ConnnectionClosed), + btreemap! { + "peer_id" => peer_id.to_string(), + "address" => endpoint.get_remote_address().to_string() + }, + ); } SwarmEvent::OutgoingConnectionError { connection_id, @@ -764,7 +805,17 @@ async fn handle_swarm_event( err=?error, connection_id=?connection_id, "outgoing connection error" - ) + ); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::OutgoingConnectionError), + btreemap! { + "peer_id" => peer_id.map_or("Unknown peer".into(), |p| p.to_string()), + "error" => error.to_string() + }, + ); } SwarmEvent::IncomingConnectionError { connection_id, @@ -778,7 +829,16 @@ async fn handle_swarm_event( local_address=local_addr.to_string(), remote_address=send_back_addr.to_string(), "incoming connection error" - ) + ); + + #[cfg(feature = "websocket-notify")] + notification::emit_event( + event_handler.ws_evt_sender(), + EventNotificationTyp::SwarmNotification(SwarmNotification::IncomingConnectionError), + btreemap! { + "error" => error.to_string() + }, + ); } SwarmEvent::ListenerError { listener_id, error } => { error!(err=?error, listener_id=?listener_id, "listener error") diff --git a/homestar-runtime/src/network/webserver.rs b/homestar-runtime/src/network/webserver.rs index 738986c06..915739779 100644 --- a/homestar-runtime/src/network/webserver.rs +++ b/homestar-runtime/src/network/webserver.rs @@ -32,6 +32,7 @@ mod rpc; #[cfg(feature = "websocket-notify")] pub(crate) use notifier::Notifier; +pub(crate) use rpc::SUBSCRIBE_NETWORK_EVENTS_ENDPOINT; use rpc::{Context, JsonRpc}; /// Message type for messages sent back from the @@ -234,7 +235,7 @@ fn port_available(host: IpAddr, port: u16) -> bool { #[cfg(test)] mod test { use super::*; - use crate::settings::Settings; + use crate::{event_handler::notification::ReceiptNotification, settings::Settings}; use homestar_core::test_utils; #[cfg(feature = "websocket-notify")] use homestar_core::{ @@ -247,7 +248,7 @@ mod test { use jsonrpsee::types::error::ErrorCode; use jsonrpsee::{core::client::ClientT, rpc_params, ws_client::WsClientBuilder}; #[cfg(feature = "websocket-notify")] - use notifier::{self, Header, NotifyReceipt}; + use notifier::{self, Header}; use serial_test::file_serial; use tokio::sync::mpsc; @@ -361,7 +362,7 @@ mod test { // send any bytes through (Vec) let (invocation_receipt, runtime_receipt) = crate::test_utils::receipt::receipts(); - let receipt = NotifyReceipt::with(invocation_receipt, runtime_receipt.cid(), None); + let receipt = ReceiptNotification::with(invocation_receipt, runtime_receipt.cid(), None); server .evt_notifier .notify(notifier::Message::new( @@ -401,11 +402,11 @@ mod test { .unwrap(); let msg1 = sub.next().await.unwrap().unwrap(); - let returned1: NotifyReceipt = DagJson::from_json(&msg1).unwrap(); + let returned1: ReceiptNotification = DagJson::from_json(&msg1).unwrap(); assert_eq!(returned1, receipt); let msg2 = sub.next().await.unwrap().unwrap(); - let _returned1: NotifyReceipt = DagJson::from_json(&msg2).unwrap(); + let _returned1: ReceiptNotification = DagJson::from_json(&msg2).unwrap(); assert!(sub.unsubscribe().await.is_ok()); diff --git a/homestar-runtime/src/network/webserver/notifier.rs b/homestar-runtime/src/network/webserver/notifier.rs index 2630dd152..dc7abd6a4 100644 --- a/homestar-runtime/src/network/webserver/notifier.rs +++ b/homestar-runtime/src/network/webserver/notifier.rs @@ -2,8 +2,7 @@ use anyhow::Result; use faststr::FastStr; -use homestar_core::{ipld::DagJson, workflow::Receipt}; -use libipld::{ipld, Cid, Ipld}; +use libipld::Cid; use std::{fmt, sync::Arc}; use tokio::sync::broadcast; @@ -95,45 +94,3 @@ impl Message { &self.payload } } - -/// A [Receipt] that is sent out *just* for websocket notifications. -#[derive(Debug, Clone, PartialEq)] -pub(crate) struct NotifyReceipt(Ipld); - -impl NotifyReceipt { - /// TODO - #[allow(dead_code)] - pub(crate) fn inner(&self) -> &Ipld { - &self.0 - } - - /// TODO - #[allow(dead_code)] - pub(crate) fn into_inner(self) -> Ipld { - self.0.to_owned() - } - - pub(crate) fn with(receipt: Receipt, cid: Cid, metadata: Option) -> Self { - let receipt: Ipld = receipt.into(); - let data = ipld!({ - "receipt": receipt, - "metadata": metadata.as_ref().map(|m| m.to_owned()).map_or(Ipld::Null, |m| m), - "receipt_cid": cid, - }); - NotifyReceipt(data) - } -} - -impl DagJson for NotifyReceipt where Ipld: From {} - -impl From for Ipld { - fn from(receipt: NotifyReceipt) -> Self { - receipt.0 - } -} - -impl From for NotifyReceipt { - fn from(ipld: Ipld) -> Self { - NotifyReceipt(ipld) - } -} diff --git a/homestar-runtime/tests/fixtures/test_notification1.toml b/homestar-runtime/tests/fixtures/test_notification1.toml new file mode 100644 index 000000000..979515fd5 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_notification1.toml @@ -0,0 +1,20 @@ +[monitoring] +process_collector_interval = 500 +console_subscriber_port = 5582 + +[node] + +[node.network] +metrics_port = 4032 +rpc_port = 9822 +webserver_port = 8022 +listen_address = "/ip4/127.0.0.1/tcp/7010" +node_addresses = [ + "/ip4/127.0.0.1/tcp/7011/p2p/16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc", +] +enable_mdns = false +enable_rendezvous_client = false + +# Peer ID 12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN +[node.network.keypair_config] +existing = { key_type = "ed25519", path = "./fixtures/__testkey_ed25519.pem" } diff --git a/homestar-runtime/tests/fixtures/test_notification2.toml b/homestar-runtime/tests/fixtures/test_notification2.toml new file mode 100644 index 000000000..dd22b7a11 --- /dev/null +++ b/homestar-runtime/tests/fixtures/test_notification2.toml @@ -0,0 +1,20 @@ +[monitoring] +process_collector_interval = 500 +console_subscriber_port = 5583 + +[node] + +[node.network] +metrics_port = 4033 +rpc_port = 9823 +webserver_port = 8023 +listen_address = "/ip4/127.0.0.1/tcp/7011" +node_addresses = [ + "/ip4/127.0.0.1/tcp/7010/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", +] +enable_mdns = false +enable_rendezvous_client = false + +# Peer ID 16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc +[node.network.keypair_config] +existing = { key_type = "secp256k1", path = "./fixtures/__testkey_secp256k1.der" } diff --git a/homestar-runtime/tests/network.rs b/homestar-runtime/tests/network.rs index 7bfd507d9..0884b91c3 100644 --- a/homestar-runtime/tests/network.rs +++ b/homestar-runtime/tests/network.rs @@ -11,6 +11,9 @@ use std::{ time::Duration, }; +#[cfg(feature = "websocket-notify")] +mod notification; + #[allow(dead_code)] static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME)); @@ -65,7 +68,8 @@ fn test_libp2p_listens_on_address_serial() -> Result<()> { stdout, vec![ "local node is listening", - "/ip4/127.0.0.1/tcp/7000/p2p/12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", + "/ip4/127.0.0.1/tcp/7000", + "12D3KooWDpJ7As7BWAwRMfu1VU2WCqNjvq387JEYKDBj4kx6nXTN", ], ); diff --git a/homestar-runtime/tests/network/notification.rs b/homestar-runtime/tests/network/notification.rs new file mode 100644 index 000000000..ea2d9b198 --- /dev/null +++ b/homestar-runtime/tests/network/notification.rs @@ -0,0 +1,127 @@ +use crate::utils::{kill_homestar, stop_homestar, BIN_NAME}; +use anyhow::Result; +use jsonrpsee::{ + core::client::{Subscription, SubscriptionClientT}, + rpc_params, + ws_client::WsClientBuilder, +}; +use once_cell::sync::Lazy; +use retry::{delay::Exponential, retry}; +use serial_test::file_serial; +use std::{ + net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}, + path::PathBuf, + process::{Command, Stdio}, + time::Duration, +}; + +static BIN: Lazy = Lazy::new(|| assert_cmd::cargo::cargo_bin(BIN_NAME)); +const SUBSCRIBE_NETWORK_EVENTS_ENDPOINT: &str = "subscribe_network_events"; +const UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT: &str = "unsubscribe_network_events"; + +#[test] +#[file_serial] +fn test_connection_notifications_serial() -> Result<()> { + let _ = stop_homestar(); + + let homestar_proc1 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_notification1.toml") + .arg("--db") + .arg("homestar1.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let ws_port = 8022; + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), ws_port); + let result = retry(Exponential::from_millis(1000).take(10), || { + TcpStream::connect(socket).map(|stream| stream.shutdown(Shutdown::Both)) + }); + + if result.is_err() { + let _ = kill_homestar(homestar_proc1, None); + panic!("Homestar server/runtime failed to start in time"); + } + + let ws_url = format!("ws://{}:{}", Ipv4Addr::LOCALHOST, ws_port); + tokio_test::block_on(async { + tokio_tungstenite::connect_async(ws_url.clone()) + .await + .unwrap(); + + let client = WsClientBuilder::default() + .build(ws_url.clone()) + .await + .unwrap(); + let mut sub: Subscription> = client + .subscribe( + SUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + rpc_params![], + UNSUBSCRIBE_NETWORK_EVENTS_ENDPOINT, + ) + .await + .unwrap(); + + tokio::time::sleep(Duration::from_millis(200)).await; + + let homestar_proc2 = Command::new(BIN.as_os_str()) + .env( + "RUST_LOG", + "homestar=debug,homestar_runtime=debug,libp2p=debug,libp2p_gossipsub::behaviour=debug", + ) + .arg("start") + .arg("-c") + .arg("tests/fixtures/test_notification2.toml") + .arg("--db") + .arg("homestar2.db") + .stdout(Stdio::piped()) + .spawn() + .unwrap(); + + let _ = kill_homestar(homestar_proc2, None); + + tokio::time::sleep(Duration::from_secs(2)).await; + + { + let msg = sub + .next() + .await + .expect("Subscription did not receive a connection established message"); + let json: serde_json::Value = serde_json::from_slice(&msg.unwrap()).unwrap(); + let typ = json["type"].as_str().unwrap(); + let peer_id = json["data"]["peer_id"].as_str().unwrap(); + + assert_eq!(typ, "network:connectionEstablished"); + assert_eq!( + peer_id, + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc" + ); + } + + { + let msg = sub + .next() + .await + .expect("Subscription did not receive a connection closed message"); + let json: serde_json::Value = serde_json::from_slice(&msg.unwrap()).unwrap(); + let typ = json["type"].as_str().unwrap(); + let peer_id = json["data"]["peer_id"].as_str().unwrap(); + + assert_eq!(typ, "network:connectionClosed"); + assert_eq!( + peer_id, + "16Uiu2HAm3g9AomQNeEctL2hPwLapap7AtPSNt8ZrBny4rLx1W5Dc" + ); + } + }); + + let _ = kill_homestar(homestar_proc1, None); + + Ok(()) +}