From 2bd025e902319c447f4cfae44bd04d2fe0114b7f Mon Sep 17 00:00:00 2001 From: magine Date: Tue, 5 Mar 2024 23:04:39 +0800 Subject: [PATCH] Use MessageHandler in callback --- crates/core/src/dht/stabilization.rs | 21 +- crates/core/src/inspect.rs | 1 - .../core/src/message/handlers/connection.rs | 134 +++++----- crates/core/src/message/handlers/custom.rs | 13 +- crates/core/src/message/handlers/dht.rs | 114 -------- crates/core/src/message/handlers/mod.rs | 214 ++++++++------- .../src/message/handlers/stabilization.rs | 43 ++- crates/core/src/message/handlers/storage.rs | 140 +++++----- crates/core/src/message/handlers/subring.rs | 2 +- crates/core/src/message/mod.rs | 1 - crates/core/src/swarm/builder.rs | 20 +- crates/core/src/swarm/callback.rs | 68 +++-- crates/core/src/swarm/impls.rs | 246 ------------------ crates/core/src/swarm/mod.rs | 239 +++++++---------- crates/core/src/transport.rs | 48 +++- 15 files changed, 481 insertions(+), 823 deletions(-) delete mode 100644 crates/core/src/message/handlers/dht.rs delete mode 100644 crates/core/src/swarm/impls.rs diff --git a/crates/core/src/dht/stabilization.rs b/crates/core/src/dht/stabilization.rs index 77c59f581..8c762137a 100644 --- a/crates/core/src/dht/stabilization.rs +++ b/crates/core/src/dht/stabilization.rs @@ -11,7 +11,6 @@ use crate::dht::PeerRing; use crate::dht::PeerRingAction; use crate::dht::PeerRingRemoteAction; use crate::error::Result; -use crate::message::handlers::MessageHandlerEvent; use crate::message::FindSuccessorReportHandler; use crate::message::FindSuccessorSend; use crate::message::FindSuccessorThen; @@ -93,11 +92,11 @@ impl Stabilization { tracing::debug!("STABILIZATION notify_predecessor: {:?}", s); let payload = MessagePayload::new_send( msg.clone(), - self.swarm.session_sk(), + self.swarm.transport.session_sk(), s, self.swarm.did(), )?; - self.swarm.send_payload(payload).await?; + self.swarm.transport.send_payload(payload).await?; } Ok(()) } else { @@ -122,11 +121,11 @@ impl Stabilization { }); let payload = MessagePayload::new_send( msg.clone(), - self.swarm.session_sk(), + self.swarm.transport.session_sk(), closest_predecessor, closest_predecessor, )?; - self.swarm.send_payload(payload).await?; + self.swarm.transport.send_payload(payload).await?; Ok(()) } _ => { @@ -150,11 +149,13 @@ impl Stabilization { PeerRingRemoteAction::QueryForSuccessorListAndPred, ) = self.chord.pre_stabilize()? { - let evs = vec![MessageHandlerEvent::SendDirectMessage( - Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)), - next, - )]; - return self.swarm.handle_message_handler_events(&evs).await; + self.swarm + .transport + .send_direct_message( + Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)), + next, + ) + .await?; } Ok(()) } diff --git a/crates/core/src/inspect.rs b/crates/core/src/inspect.rs index bdff38e7d..8c8b96511 100644 --- a/crates/core/src/inspect.rs +++ b/crates/core/src/inspect.rs @@ -1,4 +1,3 @@ -use rings_transport::core::transport::ConnectionInterface; use serde::Deserialize; use serde::Serialize; diff --git a/crates/core/src/message/handlers/connection.rs b/crates/core/src/message/handlers/connection.rs index 929d0b6cf..b4809ce57 100644 --- a/crates/core/src/message/handlers/connection.rs +++ b/crates/core/src/message/handlers/connection.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; -use super::dht; +use crate::dht::types::Chord; +use crate::dht::types::CorrectChord; use crate::dht::PeerRingAction; use crate::dht::TopoInfo; use crate::error::Error; @@ -17,27 +18,21 @@ use crate::message::FindSuccessorReportHandler; use crate::message::FindSuccessorThen; use crate::message::HandleMsg; use crate::message::MessageHandler; -use crate::message::MessageHandlerEvent; use crate::message::MessagePayload; +use crate::message::PayloadSender; /// QueryForTopoInfoSend is direct message #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &QueryForTopoInfoSend, - ) -> Result> { - let info: TopoInfo = TopoInfo::try_from(self.dht.deref())?; + async fn handle(&self, ctx: &MessagePayload, msg: &QueryForTopoInfoSend) -> Result<()> { + let info: TopoInfo = TopoInfo::try_from(self.dht.as_ref())?; if msg.did == self.dht.did { - Ok(vec![MessageHandlerEvent::SendReportMessage( - ctx.clone(), - Message::QueryForTopoInfoReport(msg.resp(info)), - )]) - } else { - Ok(vec![]) + self.transport + .send_report_message(ctx, Message::QueryForTopoInfoReport(msg.resp(info))) + .await? } + Ok(()) } } @@ -45,41 +40,36 @@ impl HandleMsg for MessageHandler { #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &QueryForTopoInfoReport, - ) -> Result> { + async fn handle(&self, _ctx: &MessagePayload, msg: &QueryForTopoInfoReport) -> Result<()> { match msg.then { - ::Then::SyncSuccessor => Ok(msg - .info - .successors - .iter() - .map(|did| MessageHandlerEvent::JoinDHT(ctx.clone(), *did)) - .collect()), + ::Then::SyncSuccessor => { + for peer in msg.info.successors.iter() { + self.join_dht(*peer).await?; + } + } ::Then::Stabilization => { let ev = self.dht.stabilize(msg.info.clone())?; - dht::handle_dht_events(&ev, ctx).await + self.handle_dht_events(&ev).await?; } } + Ok(()) } } #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &ConnectNodeSend, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &ConnectNodeSend) -> Result<()> { if self.dht.did != ctx.relay.destination { - Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]) + self.transport.forward_payload(ctx, None).await } else { - Ok(vec![MessageHandlerEvent::AnswerOffer( - ctx.clone(), - msg.clone(), - )]) + let answer = self + .transport + .answer_remote_connection(ctx.relay.origin_sender(), self.inner_callback(), msg) + .await?; + self.transport + .send_report_message(ctx, Message::ConnectNodeReport(answer)) + .await } } } @@ -87,18 +77,13 @@ impl HandleMsg for MessageHandler { #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &ConnectNodeReport, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &ConnectNodeReport) -> Result<()> { if self.dht.did != ctx.relay.destination { - Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]) + self.transport.forward_payload(ctx, None).await } else { - Ok(vec![MessageHandlerEvent::AcceptAnswer( - ctx.relay.origin_sender(), - msg.clone(), - )]) + self.transport + .accept_remote_connection(ctx.relay.origin_sender(), msg) + .await } } } @@ -106,37 +91,29 @@ impl HandleMsg for MessageHandler { #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &FindSuccessorSend, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &FindSuccessorSend) -> Result<()> { match self.dht.find_successor(msg.did)? { PeerRingAction::Some(did) => { if !msg.strict || self.dht.did == msg.did { match &msg.then { FindSuccessorThen::Report(handler) => { - Ok(vec![MessageHandlerEvent::SendReportMessage( - ctx.clone(), - Message::FindSuccessorReport(FindSuccessorReport { - did, - handler: handler.clone(), - }), - )]) + self.transport + .send_report_message( + ctx, + Message::FindSuccessorReport(FindSuccessorReport { + did, + handler: handler.clone(), + }), + ) + .await } } } else { - Ok(vec![MessageHandlerEvent::ForwardPayload( - ctx.clone(), - Some(did), - )]) + self.transport.forward_payload(ctx, Some(did)).await } } PeerRingAction::RemoteAction(next, _) => { - Ok(vec![MessageHandlerEvent::ResetDestination( - ctx.clone(), - next, - )]) + self.transport.reset_destination(ctx, next).await } act => Err(Error::PeerRingUnexpectedAction(act)), } @@ -146,22 +123,27 @@ impl HandleMsg for MessageHandler { #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &FindSuccessorReport, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &FindSuccessorReport) -> Result<()> { if self.dht.did != ctx.relay.destination { - return Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]); + return self.transport.forward_payload(ctx, None).await; } match &msg.handler { - FindSuccessorReportHandler::FixFingerTable => { - Ok(vec![MessageHandlerEvent::Connect(msg.did)]) + FindSuccessorReportHandler::FixFingerTable | FindSuccessorReportHandler::Connect => { + if msg.did != self.dht.did { + let offer_msg = self + .transport + .prepare_connection_offer(msg.did, self.inner_callback()) + .await?; + self.transport + .send_message(Message::ConnectNodeSend(offer_msg), msg.did) + .await?; + } } - FindSuccessorReportHandler::Connect => Ok(vec![MessageHandlerEvent::Connect(msg.did)]), - _ => Ok(vec![]), + _ => {} } + + Ok(()) } } diff --git a/crates/core/src/message/handlers/custom.rs b/crates/core/src/message/handlers/custom.rs index 08acfafad..1a10816b3 100644 --- a/crates/core/src/message/handlers/custom.rs +++ b/crates/core/src/message/handlers/custom.rs @@ -4,21 +4,16 @@ use crate::error::Result; use crate::message::types::CustomMessage; use crate::message::HandleMsg; use crate::message::MessageHandler; -use crate::message::MessageHandlerEvent; use crate::message::MessagePayload; +use crate::message::PayloadSender; #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - _: &CustomMessage, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, _: &CustomMessage) -> Result<()> { if self.dht.did != ctx.relay.destination { - Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]) - } else { - Ok(vec![]) + self.transport.forward_payload(ctx, None).await?; } + Ok(()) } } diff --git a/crates/core/src/message/handlers/dht.rs b/crates/core/src/message/handlers/dht.rs deleted file mode 100644 index 6acb23210..000000000 --- a/crates/core/src/message/handlers/dht.rs +++ /dev/null @@ -1,114 +0,0 @@ -#![warn(missing_docs)] -//! This module provides helper function for handle DHT related Actions - -use async_recursion::async_recursion; - -use crate::dht::PeerRingAction; -use crate::dht::PeerRingRemoteAction; -use crate::error::Result; -use crate::message::types::FindSuccessorSend; -use crate::message::types::Message; -use crate::message::types::QueryForTopoInfoSend; -use crate::message::FindSuccessorReportHandler; -use crate::message::FindSuccessorThen; -use crate::message::MessageHandlerEvent; -use crate::message::MessagePayload; - -/// This accept a action instance, handler_func and error_msg string as parameter. -/// This macro is used for handling `PeerRingAction::MultiActions`. -/// -/// It accepts three parameters: -/// * `$actions`: This parameter represents the actions that will be processed. -/// * `$handler_func`: This is the handler function that will be used to process each action. It is expected to be -/// an expression that evaluates to a closure. The closure should be an asynchronous function -/// which accepts a single action and returns a `Result`. -/// The function will be called for each action in `$actions`, and should handle the action appropriately. -/// -/// * `$error_msg`: This is a string that will be used as the error message if the handler function returns an error. -/// The string should include one set of braces `{}` that will be filled with the `Debug` representation -/// of the error returned from the handler function. -/// -/// The macro returns a `Result>`. If all actions are processed successfully, it returns -/// `Ok(Vec)`, where the vector includes all the successful results from the handler function. -/// If any action fails, an error message will be logged, but the error will not be returned from the macro; instead, -/// it will continue with the next action. -/// -/// The macro is asynchronous, so it should be used within an `async` context. -#[macro_export] -macro_rules! handle_multi_actions { - ($actions:expr, $handler_func:expr, $error_msg:expr) => {{ - let ret: Vec = - futures::future::join_all($actions.iter().map($handler_func)) - .await - .iter() - .map(|x| { - if x.is_err() { - tracing::error!($error_msg, x) - }; - x - }) - .filter_map(|x| x.as_ref().ok()) - .flat_map(|xs| xs.iter()) - .cloned() - .collect(); - Ok(ret) - }}; -} - -/// Handler of join dht event from PeerRing DHT. -#[cfg_attr(feature = "wasm", async_recursion(?Send))] -#[cfg_attr(not(feature = "wasm"), async_recursion)] -pub async fn handle_dht_events( - act: &PeerRingAction, - ctx: &MessagePayload, -) -> Result> { - match act { - PeerRingAction::None => Ok(vec![]), - // Ask next hop to find successor for did, - // if there is only two nodes A, B, it may cause loop, for example - // A's successor is B, B ask A to find successor for B - // A may send message to it's successor, which is B - PeerRingAction::RemoteAction(next, PeerRingRemoteAction::FindSuccessorForConnect(did)) => { - if next != did { - Ok(vec![MessageHandlerEvent::SendDirectMessage( - Message::FindSuccessorSend(FindSuccessorSend { - did: *did, - strict: false, - then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect), - }), - *next, - )]) - } else { - Ok(vec![]) - } - } - // A new successor is set, request the new successor for it's successor list - PeerRingAction::RemoteAction(next, PeerRingRemoteAction::QueryForSuccessorList) => { - Ok(vec![MessageHandlerEvent::SendDirectMessage( - Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_sync(*next)), - *next, - )]) - } - PeerRingAction::RemoteAction(did, PeerRingRemoteAction::TryConnect) => { - Ok(vec![MessageHandlerEvent::ConnectVia( - *did, - ctx.relay.origin_sender(), - )]) - } - PeerRingAction::RemoteAction(did, PeerRingRemoteAction::Notify(target_id)) => { - if did == target_id { - tracing::warn!("Did is equal to target_id, may implement wrong."); - return Ok(vec![]); - } - Ok(vec![MessageHandlerEvent::Notify(*target_id)]) - } - PeerRingAction::MultiActions(acts) => { - handle_multi_actions!( - acts, - |act| async { handle_dht_events(act, ctx).await }, - "Failed on handle multi actions: {:#?}" - ) - } - _ => unreachable!(), - } -} diff --git a/crates/core/src/message/handlers/mod.rs b/crates/core/src/message/handlers/mod.rs index a8e440d27..11eea758c 100644 --- a/crates/core/src/message/handlers/mod.rs +++ b/crates/core/src/message/handlers/mod.rs @@ -1,34 +1,35 @@ #![warn(missing_docs)] //! This module implemented message handler of rings network. -/// Message Flow: -/// +---------+ +--------------------------------+ -/// | Message | -> | MessageHandler.handler_payload | -/// +---------+ +--------------------------------+ -/// || || -/// +--------------------------+ +--------------------------+ -/// | Builtin Message Callback | | Custom Message Callback | -/// +--------------------------+ +--------------------------+ + use std::sync::Arc; use async_recursion::async_recursion; use async_trait::async_trait; -use super::Message; use super::MessagePayload; -use crate::dht::vnode::VirtualNode; +use crate::dht::Chord; +use crate::dht::CorrectChord; use crate::dht::Did; use crate::dht::PeerRing; +use crate::dht::PeerRingAction; +use crate::dht::PeerRingRemoteAction; +use crate::error::Error; use crate::error::Result; -use crate::message::ConnectNodeReport; -use crate::message::ConnectNodeSend; +use crate::message::types::FindSuccessorSend; +use crate::message::types::Message; +use crate::message::types::QueryForTopoInfoSend; +use crate::message::FindSuccessorReportHandler; +use crate::message::FindSuccessorThen; +use crate::message::NotifyPredecessorSend; +use crate::message::PayloadSender; +use crate::swarm::callback::InnerSwarmCallback; +use crate::swarm::callback::SharedSwarmCallback; use crate::transport::SwarmTransport; /// Operator and Handler for Connection pub mod connection; /// Operator and Handler for CustomMessage pub mod custom; -/// For handle dht related actions -pub mod dht; /// Operator and handler for DHT stablization pub mod stabilization; /// Operator and Handler for Storage @@ -36,50 +37,12 @@ pub mod storage; /// Operator and Handler for Subring pub mod subring; -type NextHop = Did; - -/// MessageHandlerEvent that will be handled by Swarm. -#[derive(Debug, Clone)] -pub enum MessageHandlerEvent { - /// Instructs the swarm to connect to a peer. - Connect(Did), - /// Instructs the swarm to connect to a peer via given next hop. - ConnectVia(Did, NextHop), - - /// Instructs the swarm to answer an offer inside payload by given - /// sender's Did and Message. - AnswerOffer(MessagePayload, ConnectNodeSend), - - /// Instructs the swarm to accept an answer inside payload by given - /// sender's Did and Message. - AcceptAnswer(Did, ConnectNodeReport), - - /// Tell swarm to forward the payload to destination by given - /// Payload and optional next hop. - ForwardPayload(MessagePayload, Option), - - /// Instructs the swarm to send a direct message to a peer. - SendDirectMessage(Message, Did), - - /// Instructs the swarm to send a message to a peer via the dht network. - SendMessage(Message, Did), - - /// Instructs the swarm to send a message as a response to the received message. - SendReportMessage(MessagePayload, Message), - - /// Instructs the swarm to send a message to a peer via the dht network with a specific next hop. - ResetDestination(MessagePayload, Did), - - /// Instructs the swarm to store vnode. - StorageStore(VirtualNode), - /// Notify a node - Notify(Did), -} - /// MessageHandler will manage resources. #[derive(Clone)] pub struct MessageHandler { transport: Arc, + dht: Arc, + swarm_callback: SharedSwarmCallback, } /// Generic trait for handle message ,inspired by Actor-Model. @@ -87,49 +50,120 @@ pub struct MessageHandler { #[cfg_attr(not(feature = "wasm"), async_trait)] pub trait HandleMsg { /// Message handler. - async fn handle(&self, ctx: &MessagePayload, msg: &T) -> Result>; + async fn handle(&self, ctx: &MessagePayload, msg: &T) -> Result<()>; } impl MessageHandler { - /// Create a new MessageHandler Instance. - pub fn new(transport: Arc) -> Self { - Self { transport } + /// Create a new MessageHandler instance. + pub fn new(transport: Arc, swarm_callback: SharedSwarmCallback) -> Self { + let dht = transport.dht.clone(); + Self { + transport, + dht, + swarm_callback, + } + } + + fn inner_callback(&self) -> InnerSwarmCallback { + InnerSwarmCallback::new(self.transport.clone(), self.swarm_callback.clone()) + } + + pub(crate) async fn join_dht(&self, peer: Did) -> Result<()> { + if cfg!(feature = "experimental") { + let conn = self + .transport + .get_connection(peer) + .ok_or(Error::SwarmMissDidInTable(peer))?; + let dht_ev = self.dht.join_then_sync(conn).await?; + self.handle_dht_events(&dht_ev).await + } else { + let dht_ev = self.dht.join(peer)?; + self.handle_dht_events(&dht_ev).await + } + } + + pub(crate) async fn leave_dht(&self, peer: Did) -> Result<()> { + if self + .transport + .get_and_check_connection(peer) + .await + .is_none() + { + self.dht.remove(peer)? + }; + Ok(()) } - /// Handle builtin message. #[cfg_attr(feature = "wasm", async_recursion(?Send))] #[cfg_attr(not(feature = "wasm"), async_recursion)] - pub async fn handle_message( - &self, - payload: &MessagePayload, - ) -> Result> { - let message: Message = payload.transaction.data()?; - - tracing::debug!( - "START HANDLE MESSAGE: {} {}", - &payload.transaction.tx_id, - &message - ); - - let events = match &message { - Message::ConnectNodeSend(ref msg) => self.handle(payload, msg).await, - Message::ConnectNodeReport(ref msg) => self.handle(payload, msg).await, - Message::FindSuccessorSend(ref msg) => self.handle(payload, msg).await, - Message::FindSuccessorReport(ref msg) => self.handle(payload, msg).await, - Message::NotifyPredecessorSend(ref msg) => self.handle(payload, msg).await, - Message::NotifyPredecessorReport(ref msg) => self.handle(payload, msg).await, - Message::SearchVNode(ref msg) => self.handle(payload, msg).await, - Message::FoundVNode(ref msg) => self.handle(payload, msg).await, - Message::SyncVNodeWithSuccessor(ref msg) => self.handle(payload, msg).await, - Message::OperateVNode(ref msg) => self.handle(payload, msg).await, - Message::CustomMessage(ref msg) => self.handle(payload, msg).await, - Message::QueryForTopoInfoSend(ref msg) => self.handle(payload, msg).await, - Message::QueryForTopoInfoReport(ref msg) => self.handle(payload, msg).await, - Message::Chunk(_) => Ok(vec![]), - }?; - - tracing::debug!("FINISH HANDLE MESSAGE {}", &payload.transaction.tx_id); - Ok(events) + pub(crate) async fn handle_dht_events(&self, act: &PeerRingAction) -> Result<()> { + match act { + PeerRingAction::None => Ok(()), + // Ask next hop to find successor for did, + // if there is only two nodes A, B, it may cause loop, for example + // A's successor is B, B ask A to find successor for B + // A may send message to it's successor, which is B + PeerRingAction::RemoteAction( + next, + PeerRingRemoteAction::FindSuccessorForConnect(did), + ) => { + if next != did { + self.transport + .send_direct_message( + Message::FindSuccessorSend(FindSuccessorSend { + did: *did, + strict: false, + then: FindSuccessorThen::Report( + FindSuccessorReportHandler::Connect, + ), + }), + *next, + ) + .await?; + Ok(()) + } else { + Ok(()) + } + } + // A new successor is set, request the new successor for it's successor list + PeerRingAction::RemoteAction(next, PeerRingRemoteAction::QueryForSuccessorList) => { + self.transport + .send_direct_message( + Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_sync(*next)), + *next, + ) + .await?; + Ok(()) + } + PeerRingAction::RemoteAction(did, PeerRingRemoteAction::TryConnect) => { + self.transport.connect(*did, self.inner_callback()).await?; + Ok(()) + } + PeerRingAction::RemoteAction(did, PeerRingRemoteAction::Notify(target_id)) => { + if did == target_id { + tracing::warn!("Did is equal to target_id, may implement wrong."); + return Ok(()); + } + let msg = + Message::NotifyPredecessorSend(NotifyPredecessorSend { did: self.dht.did }); + self.transport.send_message(msg, *target_id).await?; + Ok(()) + } + PeerRingAction::MultiActions(acts) => { + let jobs = acts + .iter() + .map(|act| async move { self.handle_dht_events(act).await }); + + for res in futures::future::join_all(jobs).await { + if res.is_err() { + tracing::error!("Failed on handle multi actions: {:#?}", res) + } + } + + Ok(()) + } + _ => unreachable!(), + } } } diff --git a/crates/core/src/message/handlers/stabilization.rs b/crates/core/src/message/handlers/stabilization.rs index 450e54f28..638bf2042 100644 --- a/crates/core/src/message/handlers/stabilization.rs +++ b/crates/core/src/message/handlers/stabilization.rs @@ -11,52 +11,51 @@ use crate::message::types::NotifyPredecessorSend; use crate::message::types::SyncVNodeWithSuccessor; use crate::message::HandleMsg; use crate::message::MessageHandler; -use crate::message::MessageHandlerEvent; use crate::message::MessagePayload; +use crate::message::PayloadSender; #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &NotifyPredecessorSend, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &NotifyPredecessorSend) -> Result<()> { let predecessor = self.dht.notify(msg.did)?; if predecessor != ctx.relay.origin_sender() { - return Ok(vec![MessageHandlerEvent::SendReportMessage( - ctx.clone(), - Message::NotifyPredecessorReport(NotifyPredecessorReport { did: predecessor }), - )]); + return self + .transport + .send_report_message( + ctx, + Message::NotifyPredecessorReport(NotifyPredecessorReport { did: predecessor }), + ) + .await; } - Ok(vec![]) + Ok(()) } } #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - _ctx: &MessagePayload, - msg: &NotifyPredecessorReport, - ) -> Result> { - let mut events = vec![MessageHandlerEvent::Connect(msg.did)]; + async fn handle(&self, _ctx: &MessagePayload, msg: &NotifyPredecessorReport) -> Result<()> { + self.transport + .connect(msg.did, self.inner_callback()) + .await?; if let Ok(PeerRingAction::RemoteAction( next, PeerRingRemoteAction::SyncVNodeWithSuccessor(data), )) = self.dht.sync_vnode_with_successor(msg.did).await { - events.push(MessageHandlerEvent::SendMessage( - Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }), - next, - )) + self.transport + .send_message( + Message::SyncVNodeWithSuccessor(SyncVNodeWithSuccessor { data }), + next, + ) + .await?; } - Ok(events) + Ok(()) } } diff --git a/crates/core/src/message/handlers/storage.rs b/crates/core/src/message/handlers/storage.rs index 022c0eaf7..7db866f32 100644 --- a/crates/core/src/message/handlers/storage.rs +++ b/crates/core/src/message/handlers/storage.rs @@ -1,4 +1,7 @@ #![warn(missing_docs)] + +use std::sync::Arc; + use async_recursion::async_recursion; use async_trait::async_trait; @@ -11,7 +14,6 @@ use crate::dht::PeerRingAction; use crate::dht::PeerRingRemoteAction; use crate::error::Error; use crate::error::Result; -use crate::handle_multi_actions; use crate::message::types::FoundVNode; use crate::message::types::Message; use crate::message::types::SearchVNode; @@ -19,11 +21,11 @@ use crate::message::types::SyncVNodeWithSuccessor; use crate::message::Encoded; use crate::message::HandleMsg; use crate::message::MessageHandler; -use crate::message::MessageHandlerEvent; use crate::message::MessagePayload; use crate::message::PayloadSender; use crate::prelude::vnode::VNodeOperation; use crate::swarm::Swarm; +use crate::transport::SwarmTransport; /// ChordStorageInterface should imply necessary method for DHT storage #[cfg_attr(feature = "wasm", async_trait(?Send))] @@ -52,11 +54,14 @@ pub trait ChordStorageInterfaceCacheChecker { /// Handle the storage fetch action of the peer ring. #[cfg_attr(feature = "wasm", async_recursion(?Send))] #[cfg_attr(not(feature = "wasm"), async_recursion)] -async fn handle_storage_fetch_act(swarm: &Swarm, act: PeerRingAction) -> Result<()> { +async fn handle_storage_fetch_act( + transport: Arc, + act: PeerRingAction, +) -> Result<()> { match act { PeerRingAction::None => (), PeerRingAction::SomeVNode(v) => { - swarm.dht.local_cache_put(v).await?; + transport.dht.local_cache_put(v).await?; } PeerRingAction::RemoteAction(next, dht_act) => { if let PeerRingRemoteAction::FindVNode(vid) = dht_act { @@ -65,14 +70,14 @@ async fn handle_storage_fetch_act(swarm: &Swarm, act: PeerRingAction) -> Result< vid, next ); - swarm + transport .send_message(Message::SearchVNode(SearchVNode { vid }), next) .await?; } } PeerRingAction::MultiActions(acts) => { for act in acts { - handle_storage_fetch_act(swarm, act).await?; + handle_storage_fetch_act(transport.clone(), act).await?; } } act => return Err(Error::PeerRingUnexpectedAction(act)), @@ -83,17 +88,20 @@ async fn handle_storage_fetch_act(swarm: &Swarm, act: PeerRingAction) -> Result< /// Handle the storage store operations of the peer ring. #[cfg_attr(feature = "wasm", async_recursion(?Send))] #[cfg_attr(not(feature = "wasm"), async_recursion)] -pub(super) async fn handle_storage_store_act(swarm: &Swarm, act: PeerRingAction) -> Result<()> { +pub(super) async fn handle_storage_store_act( + transport: Arc, + act: PeerRingAction, +) -> Result<()> { match act { PeerRingAction::None => (), PeerRingAction::RemoteAction(target, PeerRingRemoteAction::FindVNodeForOperate(op)) => { - swarm + transport .send_message(Message::OperateVNode(op), target) .await?; } PeerRingAction::MultiActions(acts) => { for act in acts { - handle_storage_store_act(swarm, act).await?; + handle_storage_store_act(transport.clone(), act).await?; } } act => return Err(Error::PeerRingUnexpectedAction(act)), @@ -104,28 +112,34 @@ pub(super) async fn handle_storage_store_act(swarm: &Swarm, act: PeerRingAction) /// Handle the storage store operations of the peer ring. #[cfg_attr(feature = "wasm", async_recursion(?Send))] #[cfg_attr(not(feature = "wasm"), async_recursion)] -pub(super) async fn handle_storage_search_act( +async fn handle_storage_search_act( + transport: Arc, ctx: &MessagePayload, act: PeerRingAction, -) -> Result> { +) -> Result<()> { match act { - PeerRingAction::None => Ok(vec![]), - PeerRingAction::SomeVNode(v) => Ok(vec![MessageHandlerEvent::SendReportMessage( - ctx.clone(), - Message::FoundVNode(FoundVNode { data: vec![v] }), - )]), - PeerRingAction::RemoteAction(next, _) => Ok(vec![MessageHandlerEvent::ResetDestination( - ctx.clone(), - next, - )]), + PeerRingAction::None => Ok(()), + PeerRingAction::SomeVNode(v) => { + transport + .send_report_message(ctx, Message::FoundVNode(FoundVNode { data: vec![v] })) + .await + } + PeerRingAction::RemoteAction(next, _) => transport.reset_destination(ctx, next).await, PeerRingAction::MultiActions(acts) => { - handle_multi_actions!( - acts, - |act| async move { handle_storage_search_act(ctx, act.clone()).await }, - "Failed on handle multi actions: {:#?}" - ) + let jobs = acts.iter().map(|act| { + let transport_clone = transport.clone(); + async move { handle_storage_operate_act(transport_clone, ctx, act).await } + }); + + for res in futures::future::join_all(jobs).await { + if res.is_err() { + tracing::error!("Failed on handle multi actions: {:#?}", res) + } + } + + Ok(()) } - act => Err(Error::PeerRingUnexpectedAction(act)), + act => Err(Error::PeerRingUnexpectedAction(act.clone())), } } @@ -133,21 +147,26 @@ pub(super) async fn handle_storage_search_act( #[cfg_attr(feature = "wasm", async_recursion(?Send))] #[cfg_attr(not(feature = "wasm"), async_recursion)] pub(super) async fn handle_storage_operate_act( + transport: Arc, ctx: &MessagePayload, act: &PeerRingAction, -) -> Result> { +) -> Result<()> { match act { - PeerRingAction::None => Ok(vec![]), - PeerRingAction::RemoteAction(next, _) => Ok(vec![MessageHandlerEvent::ResetDestination( - ctx.clone(), - *next, - )]), + PeerRingAction::None => Ok(()), + PeerRingAction::RemoteAction(next, _) => transport.reset_destination(ctx, *next).await, PeerRingAction::MultiActions(acts) => { - handle_multi_actions!( - acts, - |act| async move { handle_storage_operate_act(ctx, act).await }, - "Failed on handle multi actions: {:#?}" - ) + let jobs = acts.iter().map(|act| { + let transport_clone = transport.clone(); + async move { handle_storage_operate_act(transport_clone, ctx, act).await } + }); + + for res in futures::future::join_all(jobs).await { + if res.is_err() { + tracing::error!("Failed on handle multi actions: {:#?}", res) + } + } + + Ok(()) } act => Err(Error::PeerRingUnexpectedAction(act.clone())), } @@ -170,7 +189,7 @@ impl ChordStorageInterface for Swarm { async fn storage_fetch(&self, vid: Did) -> Result<()> { // If peer found that data is on it's localstore, copy it to the cache let act = >::vnode_lookup(&self.dht, vid).await?; - handle_storage_fetch_act(self, act).await?; + handle_storage_fetch_act(self.transport.clone(), act).await?; Ok(()) } @@ -178,7 +197,7 @@ impl ChordStorageInterface for Swarm { async fn storage_store(&self, vnode: VirtualNode) -> Result<()> { let op = VNodeOperation::Overwrite(vnode); let act = >::vnode_operate(&self.dht, op).await?; - handle_storage_store_act(self, act).await?; + handle_storage_store_act(self.transport.clone(), act).await?; Ok(()) } @@ -186,7 +205,7 @@ impl ChordStorageInterface for Swarm { let vnode: VirtualNode = (topic.to_string(), data).try_into()?; let op = VNodeOperation::Extend(vnode); let act = >::vnode_operate(&self.dht, op).await?; - handle_storage_store_act(self, act).await?; + handle_storage_store_act(self.transport.clone(), act).await?; Ok(()) } @@ -194,7 +213,7 @@ impl ChordStorageInterface for Swarm { let vnode: VirtualNode = (topic.to_string(), data).try_into()?; let op = VNodeOperation::Touch(vnode); let act = >::vnode_operate(&self.dht, op).await?; - handle_storage_store_act(self, act).await?; + handle_storage_store_act(self.transport.clone(), act).await?; Ok(()) } } @@ -204,14 +223,10 @@ impl ChordStorageInterface for Swarm { impl HandleMsg for MessageHandler { /// Search VNode via successor /// If a VNode is storead local, it will response immediately.(See Chordstorageinterface::storage_fetch) - async fn handle( - &self, - ctx: &MessagePayload, - msg: &SearchVNode, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &SearchVNode) -> Result<()> { // For relay message, set redundant to 1 match >::vnode_lookup(&self.dht, msg.vid).await { - Ok(action) => handle_storage_search_act(ctx, action).await, + Ok(action) => handle_storage_search_act(self.transport.clone(), ctx, action).await, Err(e) => Err(e), } } @@ -220,33 +235,25 @@ impl HandleMsg for MessageHandler { #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &FoundVNode, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &FoundVNode) -> Result<()> { if self.dht.did != ctx.relay.destination { - return Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]); + return self.transport.forward_payload(ctx, None).await; } for data in msg.data.iter().cloned() { self.dht.local_cache_put(data).await?; } - Ok(vec![]) + Ok(()) } } #[cfg_attr(feature = "wasm", async_trait(?Send))] #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { - async fn handle( - &self, - ctx: &MessagePayload, - msg: &VNodeOperation, - ) -> Result> { + async fn handle(&self, ctx: &MessagePayload, msg: &VNodeOperation) -> Result<()> { // For relay message, set redundant to 1 let action = >::vnode_operate(&self.dht, msg.clone()).await?; - handle_storage_operate_act(ctx, &action).await + handle_storage_operate_act(self.transport.clone(), ctx, &action).await } } @@ -254,18 +261,15 @@ impl HandleMsg for MessageHandler { #[cfg_attr(not(feature = "wasm"), async_trait)] impl HandleMsg for MessageHandler { // received remote sync vnode request - async fn handle( - &self, - _ctx: &MessagePayload, - msg: &SyncVNodeWithSuccessor, - ) -> Result> { - let mut events = vec![]; + async fn handle(&self, _ctx: &MessagePayload, msg: &SyncVNodeWithSuccessor) -> Result<()> { for data in msg.data.iter().cloned() { // only simply store here // For relay message, set redundant to 1 - events.push(MessageHandlerEvent::StorageStore(data)); + let op = VNodeOperation::Overwrite(data); + let act = >::vnode_operate(&self.dht, op).await?; + handle_storage_store_act(self.transport.clone(), act).await?; } - Ok(events) + Ok(()) } } diff --git a/crates/core/src/message/handlers/subring.rs b/crates/core/src/message/handlers/subring.rs index 739f7f623..4f4566228 100644 --- a/crates/core/src/message/handlers/subring.rs +++ b/crates/core/src/message/handlers/subring.rs @@ -24,7 +24,7 @@ impl SubringInterface for Swarm { async fn subring_join(&self, name: &str) -> Result<()> { let op = VNodeOperation::JoinSubring(name.to_string(), self.dht.did); let act = >::vnode_operate(&self.dht, op).await?; - handle_storage_store_act(self, act).await?; + handle_storage_store_act(self.transport.clone(), act).await?; Ok(()) } } diff --git a/crates/core/src/message/mod.rs b/crates/core/src/message/mod.rs index 4440d15e6..7613fae6d 100644 --- a/crates/core/src/message/mod.rs +++ b/crates/core/src/message/mod.rs @@ -22,7 +22,6 @@ pub use handlers::storage::ChordStorageInterfaceCacheChecker; pub use handlers::subring::SubringInterface; pub use handlers::HandleMsg; pub use handlers::MessageHandler; -pub use handlers::MessageHandlerEvent; mod protocols; pub use protocols::MessageRelay; diff --git a/crates/core/src/swarm/builder.rs b/crates/core/src/swarm/builder.rs index e354a2418..f012cd90c 100644 --- a/crates/core/src/swarm/builder.rs +++ b/crates/core/src/swarm/builder.rs @@ -7,12 +7,11 @@ use std::sync::RwLock; use crate::dht::PeerRing; use crate::dht::VNodeStorage; -use crate::message::MessageHandler; use crate::session::SessionSk; use crate::swarm::callback::SharedSwarmCallback; use crate::swarm::callback::SwarmCallback; -use crate::swarm::MeasureImpl; use crate::swarm::Swarm; +use crate::transport::MeasureImpl; use crate::transport::SwarmTransport; struct DefaultCallback; @@ -86,24 +85,21 @@ impl SwarmBuilder { self.dht_storage, )); - let transport = Arc::new(SwarmTransport::new( - &self.ice_servers, - self.external_address, - dht.clone(), - self.session_sk, - )); - let callback = RwLock::new( self.callback .unwrap_or_else(|| Arc::new(DefaultCallback {})), ); - let message_handler = MessageHandler::new(transport.clone()); + let transport = Arc::new(SwarmTransport::new( + &self.ice_servers, + self.external_address, + self.session_sk, + dht.clone(), + self.measure, + )); Swarm { dht, - measure: self.measure, - message_handler, transport, callback, } diff --git a/crates/core/src/swarm/callback.rs b/crates/core/src/swarm/callback.rs index bb108de4f..271f2b219 100644 --- a/crates/core/src/swarm/callback.rs +++ b/crates/core/src/swarm/callback.rs @@ -10,8 +10,9 @@ use crate::chunk::ChunkList; use crate::chunk::ChunkManager; use crate::consts::TRANSPORT_MTU; use crate::dht::Did; -use crate::error::Error; +use crate::message::HandleMsg; use crate::message::Message; +use crate::message::MessageHandler; use crate::message::MessagePayload; use crate::message::MessageVerificationExt; use crate::transport::SwarmTransport; @@ -62,18 +63,19 @@ pub trait SwarmCallback { /// [InnerSwarmCallback] wraps [SharedSwarmCallback] with inner handling for a specific connection. pub struct InnerSwarmCallback { - did: Did, transport: Arc, + message_handler: MessageHandler, callback: SharedSwarmCallback, chunk_list: FuturesMutex>, } impl InnerSwarmCallback { - /// Create a new [InnerSwarmCallback] with the provided did, transport_event_sender and callback. - pub fn new(did: Did, transport: Arc, callback: SharedSwarmCallback) -> Self { + /// Create a new [InnerSwarmCallback] with the provided transport and callback. + pub fn new(transport: Arc, callback: SharedSwarmCallback) -> Self { + let message_handler = MessageHandler::new(transport.clone(), callback.clone()); Self { - did, transport, + message_handler, callback, chunk_list: Default::default(), } @@ -86,14 +88,44 @@ impl InnerSwarmCallback { ) -> Result<(), CallbackError> { let message: Message = payload.transaction.data()?; - if let Message::Chunk(msg) = message { - if let Some(data) = self.chunk_list.lock().await.handle(msg.clone()) { - return self.on_message(cid, &data).await; + match &message { + Message::ConnectNodeSend(ref msg) => self.message_handler.handle(payload, msg).await?, + Message::ConnectNodeReport(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::FindSuccessorSend(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::FindSuccessorReport(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::NotifyPredecessorSend(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::NotifyPredecessorReport(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::SearchVNode(ref msg) => self.message_handler.handle(payload, msg).await?, + Message::FoundVNode(ref msg) => self.message_handler.handle(payload, msg).await?, + Message::SyncVNodeWithSuccessor(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::OperateVNode(ref msg) => self.message_handler.handle(payload, msg).await?, + Message::CustomMessage(ref msg) => self.message_handler.handle(payload, msg).await?, + Message::QueryForTopoInfoSend(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::QueryForTopoInfoReport(ref msg) => { + self.message_handler.handle(payload, msg).await? + } + Message::Chunk(ref msg) => { + if let Some(data) = self.chunk_list.lock().await.handle(msg.clone()) { + return self.on_message(cid, &data).await; + } } - return Ok(()); }; - if payload.transaction.destination == self.did { + if payload.transaction.destination == self.transport.dht.did { self.callback.on_inbound(payload).await?; } @@ -126,24 +158,12 @@ impl TransportCallback for InnerSwarmCallback { match s { WebrtcConnectionState::Connected => { - if cfg!(feature = "experimental") { - let conn = self - .transport - .get_connection(did) - .ok_or(Error::SwarmMissDidInTable(did))?; - let dht_ev = self.transport.dht.join_then_sync(conn).await?; - crate::message::handlers::dht::handle_dht_events(&dht_ev, ctx).await?; - } else { - let dht_ev = self.transport.dht.join(*did)?; - crate::message::handlers::dht::handle_dht_events(&dht_ev, ctx).await?; - } + self.message_handler.join_dht(did).await?; } WebrtcConnectionState::Failed | WebrtcConnectionState::Disconnected | WebrtcConnectionState::Closed => { - if self.transport.get_and_check_connection(did).await.is_none() { - self.transport.dht.remove(did)? - }; + self.message_handler.leave_dht(did).await?; } _ => {} }; diff --git a/crates/core/src/swarm/impls.rs b/crates/core/src/swarm/impls.rs deleted file mode 100644 index fa51a6a28..000000000 --- a/crates/core/src/swarm/impls.rs +++ /dev/null @@ -1,246 +0,0 @@ -use async_trait::async_trait; - -use crate::dht::Did; -use crate::error::Error; -use crate::error::Result; -use crate::measure::MeasureCounter; -use crate::message::Message; -use crate::message::MessagePayload; -use crate::message::MessageVerificationExt; -use crate::message::PayloadSender; -use crate::swarm::Swarm; - -/// ConnectionHandshake defined how to connect two connections between two swarms. -#[cfg_attr(feature = "wasm", async_trait(?Send))] -#[cfg_attr(not(feature = "wasm"), async_trait)] -pub trait ConnectionHandshake { - /// Creaet new connection and its answer. This function will wrap the offer inside a payload - /// with verification. - async fn create_offer(&self, peer: Did) -> Result; - - /// Answer the offer of remote connection. This function will verify the answer payload and - /// will wrap the answer inside a payload with verification. - async fn answer_offer(&self, offer_payload: MessagePayload) -> Result; - - /// Accept the answer of remote connection. This function will verify the answer payload and - /// will return its did with the connection. - async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<()>; -} - -/// A trait for managing connections. -#[cfg_attr(feature = "wasm", async_trait(?Send))] -#[cfg_attr(not(feature = "wasm"), async_trait)] -pub trait ConnectionManager { - /// Asynchronously disconnects the connection associated with the provided DID. - async fn disconnect(&self, did: Did) -> Result<()>; - - /// Asynchronously establishes a new connection and returns the connection associated with the provided DID. - async fn connect(&self, did: Did) -> Result<()>; - - /// Asynchronously establishes a new connection via a specified next hop DID and returns the connection associated with the provided DID. - async fn connect_via(&self, did: Did, next_hop: Did) -> Result<()>; -} - -/// A trait for judging whether a connection should be established with a given DID (Decentralized Identifier). -#[cfg_attr(feature = "wasm", async_trait(?Send))] -#[cfg_attr(not(feature = "wasm"), async_trait)] -pub trait Judegement { - /// Asynchronously checks if a connection should be established with the provided DID. - async fn should_connect(&self, did: Did) -> bool; - - /// Asynchronously records that a connection has been established with the provided DID. - async fn record_connect(&self, did: Did); - - /// Asynchronously records that a connection has been disconnected with the provided DID. - async fn record_disconnected(&self, did: Did); -} - -/// A trait that combines the `Judegement` and `ConnectionManager` traits. -#[cfg_attr(feature = "wasm", async_trait(?Send))] -#[cfg_attr(not(feature = "wasm"), async_trait)] -pub trait JudgeConnection: Judegement + ConnectionManager { - /// Asynchronously disconnects the connection associated with the provided DID after recording the disconnection. - async fn disconnect(&self, did: Did) -> Result<()> { - self.record_disconnected(did).await; - tracing::debug!("[JudegeConnection] Disconnected {:?}", &did); - ConnectionManager::disconnect(self, did).await - } - - /// Asynchronously establishes a new connection and returns the connection associated with the provided DID if `should_connect` returns true; otherwise, returns an error. - async fn connect(&self, did: Did) -> Result<()> { - if !self.should_connect(did).await { - return Err(Error::NodeBehaviourBad(did)); - } - tracing::debug!("[JudgeConnection] Try Connect {:?}", &did); - self.record_connect(did).await; - ConnectionManager::connect(self, did).await - } - - /// Asynchronously establishes a new connection via a specified next hop DID and returns the connection associated with the provided DID if `should_connect` returns true; otherwise, returns an error. - async fn connect_via(&self, did: Did, next_hop: Did) -> Result<()> { - if !self.should_connect(did).await { - return Err(Error::NodeBehaviourBad(did)); - } - tracing::debug!("[JudgeConnection] Try Connect {:?}", &did); - self.record_connect(did).await; - ConnectionManager::connect_via(self, did, next_hop).await - } -} - -impl Swarm { - /// Record a succeeded message sent - pub async fn record_sent(&self, did: Did) { - if let Some(measure) = &self.measure { - measure.incr(did, MeasureCounter::Sent).await; - } - } - - /// Record a failed message sent - pub async fn record_sent_failed(&self, did: Did) { - if let Some(measure) = &self.measure { - measure.incr(did, MeasureCounter::FailedToSend).await; - } - } - - /// Check that a Did is behaviour good - pub async fn behaviour_good(&self, did: Did) -> bool { - if let Some(measure) = &self.measure { - measure.good(did).await - } else { - true - } - } -} - -#[cfg_attr(feature = "wasm", async_trait(?Send))] -#[cfg_attr(not(feature = "wasm"), async_trait)] -impl ConnectionHandshake for Swarm { - async fn create_offer(&self, peer: Did) -> Result { - let offer_msg = self - .transport - .prepare_connection_offer(peer, self.callback()?) - .await?; - - // This payload has fake next_hop. - // The invoker should fix it before sending. - let payload = MessagePayload::new_send( - Message::ConnectNodeSend(offer_msg), - self.transport.session_sk(), - self.did(), - peer, - )?; - - Ok(payload) - } - - async fn answer_offer(&self, offer_payload: MessagePayload) -> Result { - if !offer_payload.verify() { - return Err(Error::VerifySignatureFailed); - } - - let Message::ConnectNodeSend(msg) = offer_payload.transaction.data()? else { - return Err(Error::InvalidMessage( - "Should be ConnectNodeSend".to_string(), - )); - }; - - let peer = offer_payload.relay.origin_sender(); - let answer_msg = self - .transport - .answer_remote_connection(peer, self.callback()?, &msg) - .await?; - - // This payload has fake next_hop. - // The invoker should fix it before sending. - let answer_payload = MessagePayload::new_send( - Message::ConnectNodeReport(answer_msg), - self.transport.session_sk(), - self.did(), - self.did(), - )?; - - Ok(answer_payload) - } - - async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<()> { - if !answer_payload.verify() { - return Err(Error::VerifySignatureFailed); - } - - let Message::ConnectNodeReport(ref msg) = answer_payload.transaction.data()? else { - return Err(Error::InvalidMessage( - "Should be ConnectNodeReport".to_string(), - )); - }; - - let peer = answer_payload.relay.origin_sender(); - self.transport.accept_remote_connection(peer, msg).await - } -} - -#[cfg_attr(feature = "wasm", async_trait(?Send))] -#[cfg_attr(not(feature = "wasm"), async_trait)] -impl ConnectionManager for Swarm { - /// Disconnect a connection. There are three steps: - /// 1) remove from DHT; - /// 2) remove from Transport; - /// 3) close the connection; - async fn disconnect(&self, peer: Did) -> Result<()> { - self.transport.disconnect(peer).await - } - - /// Connect a given Did. If the did is already connected, return directly, - /// else try prepare offer and establish connection by dht. - /// This function may returns a pending connection or connected connection. - async fn connect(&self, peer: Did) -> Result<()> { - let offer_msg = self - .transport - .prepare_connection_offer(peer, self.callback()?) - .await?; - self.transport - .send_message(Message::ConnectNodeSend(offer_msg), peer) - .await?; - Ok(()) - } - - /// Similar to connect, but this function will try connect a Did by given hop. - async fn connect_via(&self, peer: Did, next_hop: Did) -> Result<()> { - let offer_msg = self - .transport - .prepare_connection_offer(peer, self.callback()?) - .await?; - - self.transport - .send_message_by_hop(Message::ConnectNodeSend(offer_msg), peer, next_hop) - .await?; - - Ok(()) - } -} - -#[cfg_attr(feature = "wasm", async_trait(?Send))] -#[cfg_attr(not(feature = "wasm"), async_trait)] -impl Judegement for Swarm { - /// Record a succeeded connected - async fn record_connect(&self, did: Did) { - tracing::info!("Record connect {:?}", &did); - if let Some(measure) = &self.measure { - tracing::info!("[Judgement] Record connect"); - measure.incr(did, MeasureCounter::Connect).await; - } - } - - /// Record a disconnected - async fn record_disconnected(&self, did: Did) { - tracing::info!("Record disconnected {:?}", &did); - if let Some(measure) = &self.measure { - tracing::info!("[Judgement] Record disconnected"); - measure.incr(did, MeasureCounter::Disconnected).await; - } - } - - /// Asynchronously checks if a connection should be established with the provided DID. - async fn should_connect(&self, did: Did) -> bool { - self.behaviour_good(did).await - } -} diff --git a/crates/core/src/swarm/mod.rs b/crates/core/src/swarm/mod.rs index e2c1fed1b..0cc5cbc93 100644 --- a/crates/core/src/swarm/mod.rs +++ b/crates/core/src/swarm/mod.rs @@ -4,48 +4,31 @@ mod builder; /// Callback interface for swarm pub mod callback; -/// Implementations of connection management traits for swarm -pub mod impls; use std::sync::Arc; use std::sync::RwLock; -use async_recursion::async_recursion; +use async_trait::async_trait; pub use builder::SwarmBuilder; -use rings_derive::JudgeConnection; +use self::callback::InnerSwarmCallback; use crate::dht::Did; use crate::dht::PeerRing; use crate::error::Error; use crate::error::Result; use crate::inspect::SwarmInspect; -use crate::measure::BehaviourJudgement; -use crate::message::types::NotifyPredecessorSend; -use crate::message::ChordStorageInterface; use crate::message::Message; -use crate::message::MessageHandler; -use crate::message::MessageHandlerEvent; +use crate::message::MessagePayload; +use crate::message::MessageVerificationExt; use crate::message::PayloadSender; -use crate::swarm::callback::InnerSwarmCallback; use crate::swarm::callback::SharedSwarmCallback; use crate::transport::SwarmTransport; -/// Type of Measure, see [crate::measure::Measure]. -#[cfg(not(feature = "wasm"))] -pub type MeasureImpl = Box; - -/// Type of Measure, see [crate::measure::Measure]. -#[cfg(feature = "wasm")] -pub type MeasureImpl = Box; - /// The transport and dht management. -#[derive(JudgeConnection)] pub struct Swarm { /// Reference of DHT. pub(crate) dht: Arc, /// Implementationof measurement. - pub(crate) measure: Option, - message_handler: MessageHandler, pub(crate) transport: Arc, callback: RwLock, } @@ -61,17 +44,18 @@ impl Swarm { self.dht.clone() } - fn callback(&self) -> Result { - let shared = self + fn callback(&self) -> Result { + Ok(self .callback .read() .map_err(|_| Error::CallbackSyncLockError)? - .clone(); + .clone()) + } + fn inner_callback(&self) -> Result { Ok(InnerSwarmCallback::new( - self.did(), self.transport.clone(), - shared, + self.callback()?, )) } @@ -87,132 +71,22 @@ impl Swarm { Ok(()) } - /// Event handler of Swarm. - pub async fn handle_message_handler_event( - &self, - event: &MessageHandlerEvent, - ) -> Result> { - tracing::debug!("Handle message handler event: {:?}", event); - match event { - MessageHandlerEvent::Connect(did) => { - let did = *did; - if did != self.did() { - self.connect(did).await?; - } - Ok(vec![]) - } - - // Notify did with self.id - MessageHandlerEvent::Notify(did) => { - let msg = - Message::NotifyPredecessorSend(NotifyPredecessorSend { did: self.dht.did }); - Ok(vec![MessageHandlerEvent::SendMessage(msg, *did)]) - } - - MessageHandlerEvent::ConnectVia(did, next) => { - let did = *did; - if did != self.did() { - self.connect_via(did, *next).await?; - } - Ok(vec![]) - } - - MessageHandlerEvent::AnswerOffer(relay, msg) => { - let answer = self - .transport - .answer_remote_connection(relay.relay.origin_sender(), self.callback()?, msg) - .await?; - - Ok(vec![MessageHandlerEvent::SendReportMessage( - relay.clone(), - Message::ConnectNodeReport(answer), - )]) - } - - MessageHandlerEvent::AcceptAnswer(origin_sender, msg) => { - self.transport - .accept_remote_connection(origin_sender.to_owned(), msg) - .await?; - Ok(vec![]) - } - - MessageHandlerEvent::ForwardPayload(payload, next_hop) => { - self.transport.forward_payload(payload, *next_hop).await?; - Ok(vec![]) - } - - MessageHandlerEvent::SendDirectMessage(msg, dest) => { - self.transport - .send_direct_message(msg.clone(), *dest) - .await?; - Ok(vec![]) - } - - MessageHandlerEvent::SendMessage(msg, dest) => { - self.transport.send_message(msg.clone(), *dest).await?; - Ok(vec![]) - } - - MessageHandlerEvent::SendReportMessage(payload, msg) => { - self.transport - .send_report_message(payload, msg.clone()) - .await?; - Ok(vec![]) - } - - MessageHandlerEvent::ResetDestination(payload, next_hop) => { - self.transport.reset_destination(payload, *next_hop).await?; - Ok(vec![]) - } - - MessageHandlerEvent::StorageStore(vnode) => { - >::storage_store(self, vnode.clone()).await?; - Ok(vec![]) - } - } - } - - /// Batch handle events - #[cfg_attr(feature = "wasm", async_recursion(?Send))] - #[cfg_attr(not(feature = "wasm"), async_recursion)] - pub async fn handle_message_handler_events( - &self, - events: &Vec, - ) -> Result<()> { - match events.as_slice() { - [] => Ok(()), - [x, xs @ ..] => { - let evs = self.handle_message_handler_event(x).await?; - self.handle_message_handler_events(&evs).await?; - self.handle_message_handler_events(&xs.to_vec()).await - } - } - } - /// Disconnect a connection. There are three steps: /// 1) remove from DHT; /// 2) remove from Transport; /// 3) close the connection; - pub async fn disconnect(&self, did: Did) -> Result<()> { - JudgeConnection::disconnect(self, did).await + pub async fn disconnect(&self, peer: Did) -> Result<()> { + self.transport.disconnect(peer).await } /// Connect a given Did. If the did is already connected, return directly, /// else try prepare offer and establish connection by dht. /// This function may returns a pending connection or connected connection. - pub async fn connect(&self, did: Did) -> Result<()> { - if did == self.did() { - return Err(Error::ShouldNotConnectSelf); - } - JudgeConnection::connect(self, did).await - } - - /// Similar to connect, but this function will try connect a Did by given hop. - pub async fn connect_via(&self, did: Did, next_hop: Did) -> Result<()> { - if did == self.did() { + pub async fn connect(&self, peer: Did) -> Result<()> { + if peer == self.did() { return Err(Error::ShouldNotConnectSelf); } - JudgeConnection::connect_via(self, did, next_hop).await + self.transport.connect(peer, self.inner_callback()?).await } /// Check the status of swarm @@ -220,3 +94,86 @@ impl Swarm { SwarmInspect::inspect(self).await } } + +/// ConnectionHandshake defined how to connect two connections between two swarms. +#[cfg_attr(feature = "wasm", async_trait(?Send))] +#[cfg_attr(not(feature = "wasm"), async_trait)] +pub trait ConnectionHandshake { + /// Creaet new connection and its answer. This function will wrap the offer inside a payload + /// with verification. + async fn create_offer(&self, peer: Did) -> Result; + + /// Answer the offer of remote connection. This function will verify the answer payload and + /// will wrap the answer inside a payload with verification. + async fn answer_offer(&self, offer_payload: MessagePayload) -> Result; + + /// Accept the answer of remote connection. This function will verify the answer payload and + /// will return its did with the connection. + async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<()>; +} + +#[cfg_attr(feature = "wasm", async_trait(?Send))] +#[cfg_attr(not(feature = "wasm"), async_trait)] +impl ConnectionHandshake for Swarm { + async fn create_offer(&self, peer: Did) -> Result { + let offer_msg = self + .transport + .prepare_connection_offer(peer, self.inner_callback()?) + .await?; + + // This payload has fake next_hop. + // The invoker should fix it before sending. + let payload = MessagePayload::new_send( + Message::ConnectNodeSend(offer_msg), + self.transport.session_sk(), + self.did(), + peer, + )?; + + Ok(payload) + } + + async fn answer_offer(&self, offer_payload: MessagePayload) -> Result { + if !offer_payload.verify() { + return Err(Error::VerifySignatureFailed); + } + + let Message::ConnectNodeSend(msg) = offer_payload.transaction.data()? else { + return Err(Error::InvalidMessage( + "Should be ConnectNodeSend".to_string(), + )); + }; + + let peer = offer_payload.relay.origin_sender(); + let answer_msg = self + .transport + .answer_remote_connection(peer, self.inner_callback()?, &msg) + .await?; + + // This payload has fake next_hop. + // The invoker should fix it before sending. + let answer_payload = MessagePayload::new_send( + Message::ConnectNodeReport(answer_msg), + self.transport.session_sk(), + self.did(), + self.did(), + )?; + + Ok(answer_payload) + } + + async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<()> { + if !answer_payload.verify() { + return Err(Error::VerifySignatureFailed); + } + + let Message::ConnectNodeReport(ref msg) = answer_payload.transaction.data()? else { + return Err(Error::InvalidMessage( + "Should be ConnectNodeReport".to_string(), + )); + }; + + let peer = answer_payload.relay.origin_sender(); + self.transport.accept_remote_connection(peer, msg).await + } +} diff --git a/crates/core/src/transport.rs b/crates/core/src/transport.rs index ba33cbdbc..c16c8886c 100644 --- a/crates/core/src/transport.rs +++ b/crates/core/src/transport.rs @@ -25,9 +25,11 @@ use crate::chunk::ChunkList; use crate::consts::TRANSPORT_MAX_SIZE; use crate::consts::TRANSPORT_MTU; use crate::dht::Did; +use crate::dht::LiveDid; use crate::dht::PeerRing; use crate::error::Error; use crate::error::Result; +use crate::measure::BehaviourJudgement; use crate::message::ConnectNodeReport; use crate::message::ConnectNodeSend; use crate::message::Message; @@ -36,12 +38,23 @@ use crate::message::PayloadSender; use crate::session::SessionSk; use crate::swarm::callback::InnerSwarmCallback; +/// Type of Measure, see [crate::measure::Measure]. +#[cfg(not(feature = "wasm"))] +pub type MeasureImpl = Box; + +/// Type of Measure, see [crate::measure::Measure]. +#[cfg(feature = "wasm")] +pub type MeasureImpl = Box; + pub struct SwarmTransport { transport: Transport, session_sk: SessionSk, - dht: Arc, + pub(crate) dht: Arc, + #[allow(dead_code)] + measure: Option, } +#[derive(Clone)] pub struct SwarmConnection { peer: Did, connection: ConnectionRef, @@ -51,13 +64,15 @@ impl SwarmTransport { pub fn new( ice_servers: &str, external_address: Option, - dht: Arc, session_sk: SessionSk, + dht: Arc, + measure: Option, ) -> Self { Self { transport: Transport::new(ice_servers, external_address), session_sk, dht, + measure, } } @@ -123,6 +138,15 @@ impl SwarmTransport { .map_err(|e| e.into()) } + /// Connect a given Did. If the did is already connected, return Err, + /// else try prepare offer and establish connection by dht. + pub async fn connect(&self, peer: Did, callback: InnerSwarmCallback) -> Result<()> { + let offer_msg = self.prepare_connection_offer(peer, callback).await?; + self.send_message(Message::ConnectNodeSend(offer_msg), peer) + .await?; + Ok(()) + } + /// Get connection by did and check if data channel is open. /// This method will return None if the connection is not found. /// This method will wait_for_data_channel_open. @@ -309,12 +333,20 @@ impl PayloadSender for SwarmTransport { payload.relay.next_hop, ); - if result.is_ok() { - self.record_sent(payload.relay.next_hop).await - } else { - self.record_sent_failed(payload.relay.next_hop).await - } + result + } +} + +#[cfg_attr(feature = "wasm", async_trait(?Send))] +#[cfg_attr(not(feature = "wasm"), async_trait)] +impl LiveDid for SwarmConnection { + async fn live(&self) -> bool { + self.webrtc_connection_state() == WebrtcConnectionState::Connected + } +} - result.map_err(|e| e.into()) +impl From for Did { + fn from(conn: SwarmConnection) -> Self { + conn.peer } }