Skip to content

Commit

Permalink
Implement a node in default test for monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Mar 12, 2024
1 parent 70c1036 commit b800e78
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 2 deletions.
8 changes: 8 additions & 0 deletions crates/core/src/swarm/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ pub trait SwarmCallback {
Ok(())
}

/// This method is invoked when a new message is received and after handling.
/// Will be invoked no matter if the message is for this node or not.
async fn on_relay(&self, _payload: &MessagePayload) -> Result<(), CallbackError> {
Ok(())
}

/// This method is invoked when a new message is received and after handling.
/// Will not be invoked if the message is not for this node.
async fn on_inbound(&self, _payload: &MessagePayload) -> Result<(), CallbackError> {
Expand Down Expand Up @@ -125,6 +131,8 @@ impl InnerSwarmCallback {
}
};

self.callback.on_relay(payload).await?;

if payload.transaction.destination == self.transport.dht.did {
self.callback.on_inbound(payload).await?;
}
Expand Down
37 changes: 35 additions & 2 deletions crates/core/src/tests/default/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,51 @@
use std::sync::Arc;

use async_trait::async_trait;

use crate::dht::Did;
use crate::dht::PeerRing;
use crate::ecc::SecretKey;
use crate::message::MessagePayload;
use crate::session::SessionSk;
use crate::storage::MemStorage;
use crate::swarm::callback::SwarmCallback;
use crate::swarm::Swarm;
use crate::swarm::SwarmBuilder;

mod test_connection;
mod test_message_handler;
mod test_stabilization;

pub async fn prepare_node(key: SecretKey) -> Arc<Swarm> {
pub struct Node {
swarm: Arc<Swarm>,
message_rx: tokio::sync::mpsc::UnboundedReceiver<MessagePayload>,
}

pub struct NodeCallback {
message_tx: tokio::sync::mpsc::UnboundedSender<MessagePayload>,
}

impl Node {
pub fn new(swarm: Arc<Swarm>) -> Self {
let (message_tx, message_rx) = tokio::sync::mpsc::unbounded_channel();
let callback = NodeCallback { message_tx };
swarm.set_callback(Arc::new(callback));
Self { swarm, message_rx }
}

pub async fn listen_once(&mut self) -> Option<MessagePayload> {
self.message_rx.recv().await
}
}

#[async_trait]
impl SwarmCallback for NodeCallback {
async fn on_relay(&self, payload: &MessagePayload) -> Result<(), Box<dyn std::error::Error>> {
self.message_tx.send(payload.clone()).map_err(|e| e.into())
}
}

pub async fn prepare_node(key: SecretKey) -> Node {
let stun = "stun://stun.l.google.com:19302";
let storage = Box::new(MemStorage::new());

Expand All @@ -22,7 +55,7 @@ pub async fn prepare_node(key: SecretKey) -> Arc<Swarm> {
println!("key: {:?}", key.to_string());
println!("did: {:?}", swarm.did());

swarm
Node::new(swarm)
}

pub fn gen_pure_dht(did: Did) -> PeerRing {
Expand Down

0 comments on commit b800e78

Please sign in to comment.