diff --git a/crates/core/src/swarm/callback.rs b/crates/core/src/swarm/callback.rs index 271f2b219..2dc21139c 100644 --- a/crates/core/src/swarm/callback.rs +++ b/crates/core/src/swarm/callback.rs @@ -49,6 +49,12 @@ pub trait SwarmCallback { Ok(()) } + /// This method is invoked when a new message is received and after handling. + /// Will be invoked no matter if the message is for this node or not. + async fn on_relay(&self, _payload: &MessagePayload) -> Result<(), CallbackError> { + Ok(()) + } + /// This method is invoked when a new message is received and after handling. /// Will not be invoked if the message is not for this node. async fn on_inbound(&self, _payload: &MessagePayload) -> Result<(), CallbackError> { @@ -125,6 +131,8 @@ impl InnerSwarmCallback { } }; + self.callback.on_relay(payload).await?; + if payload.transaction.destination == self.transport.dht.did { self.callback.on_inbound(payload).await?; } diff --git a/crates/core/src/tests/default/mod.rs b/crates/core/src/tests/default/mod.rs index 4c3033e59..79a5abc09 100644 --- a/crates/core/src/tests/default/mod.rs +++ b/crates/core/src/tests/default/mod.rs @@ -1,10 +1,14 @@ use std::sync::Arc; +use async_trait::async_trait; + use crate::dht::Did; use crate::dht::PeerRing; use crate::ecc::SecretKey; +use crate::message::MessagePayload; use crate::session::SessionSk; use crate::storage::MemStorage; +use crate::swarm::callback::SwarmCallback; use crate::swarm::Swarm; use crate::swarm::SwarmBuilder; @@ -12,7 +16,36 @@ mod test_connection; mod test_message_handler; mod test_stabilization; -pub async fn prepare_node(key: SecretKey) -> Arc { +pub struct Node { + swarm: Arc, + message_rx: tokio::sync::mpsc::UnboundedReceiver, +} + +pub struct NodeCallback { + message_tx: tokio::sync::mpsc::UnboundedSender, +} + +impl Node { + pub fn new(swarm: Arc) -> Self { + let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel(); + let callback = NodeCallback { message_tx }; + swarm.set_callback(Arc::new(callback)); + Self { swarm, message_rx } + } + + pub async fn listen_once(&mut self) -> Option { + self.message_rx.recv().await + } +} + +#[async_trait] +impl SwarmCallback for NodeCallback { + async fn on_relay(&self, payload: &MessagePayload) -> Result<(), Box> { + self.message_tx.send(payload.clone()).map_err(|e| e.into()) + } +} + +pub async fn prepare_node(key: SecretKey) -> Node { let stun = "stun://stun.l.google.com:19302"; let storage = Box::new(MemStorage::new()); @@ -22,7 +55,7 @@ pub async fn prepare_node(key: SecretKey) -> Arc { println!("key: {:?}", key.to_string()); println!("did: {:?}", swarm.did()); - swarm + Node::new(swarm) } pub fn gen_pure_dht(did: Did) -> PeerRing {