Skip to content

Commit

Permalink
Merge branch 'master' into feat_rename_session_manager_to_delegated_sk
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanKung authored Jul 29, 2023
2 parents 8fdd108 + c81b09f commit 5efa2da
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 29 deletions.
2 changes: 1 addition & 1 deletion core/src/message/handlers/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ impl HandleMsg<FindSuccessorReport> for MessageHandler {

match &msg.handler {
FindSuccessorReportHandler::FixFingerTable => {
Ok(vec![MessageHandlerEvent::JoinDHT(ctx.clone(), msg.did)])
Ok(vec![MessageHandlerEvent::Connect(msg.did)])
}
FindSuccessorReportHandler::Connect => Ok(vec![MessageHandlerEvent::Connect(msg.did)]),
_ => Ok(vec![]),
Expand Down
28 changes: 14 additions & 14 deletions core/src/message/handlers/dht.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
//! This module provides helper function for handle DHT related Actions

use async_recursion::async_recursion;
use futures::future::join_all;

use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
Expand Down Expand Up @@ -38,19 +37,20 @@ use crate::message::MessagePayload;
#[macro_export]
macro_rules! handle_multi_actions {
($actions:expr, $handler_func:expr, $error_msg:expr) => {{
let ret: Vec<MessageHandlerEvent> = join_all($actions.iter().map($handler_func))
.await
.iter()
.map(|x| {
if x.is_err() {
tracing::error!($error_msg, x)
};
x
})
.filter_map(|x| x.as_ref().ok())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
let ret: Vec<MessageHandlerEvent> =
futures::future::join_all($actions.iter().map($handler_func))
.await
.iter()
.map(|x| {
if x.is_err() {
tracing::error!($error_msg, x)
};
x
})
.filter_map(|x| x.as_ref().ok())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
Ok(ret)
}};
}
Expand Down
41 changes: 28 additions & 13 deletions core/src/message/handlers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Error;
use crate::error::Result;
use crate::handle_multi_actions;
use crate::message::types::FoundVNode;
use crate::message::types::Message;
use crate::message::types::SearchVNode;
Expand Down Expand Up @@ -100,6 +101,30 @@ pub(super) async fn handle_storage_store_act(swarm: &Swarm, act: PeerRingAction)
Ok(())
}

/// Handle the storage store operations of the peer ring.
#[cfg_attr(feature = "wasm", async_recursion(?Send))]
#[cfg_attr(not(feature = "wasm"), async_recursion)]
pub(super) async fn handle_storage_operate_act(
ctx: &MessagePayload<Message>,
act: &PeerRingAction,
) -> Result<Vec<MessageHandlerEvent>> {
match act {
PeerRingAction::None => Ok(vec![]),
PeerRingAction::RemoteAction(next, _) => Ok(vec![MessageHandlerEvent::ResetDestination(
ctx.clone(),
*next,
)]),
PeerRingAction::MultiActions(acts) => {
handle_multi_actions!(
acts,
|act| async move { handle_storage_operate_act(ctx, act).await },
"Failed on handle multi actions: {:#?}"
)
}
act => Err(Error::PeerRingUnexpectedAction(act.clone())),
}
}

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ChordStorageInterfaceCacheChecker for Swarm {
Expand Down Expand Up @@ -204,19 +229,9 @@ impl HandleMsg<VNodeOperation> for MessageHandler {
msg: &VNodeOperation,
) -> Result<Vec<MessageHandlerEvent>> {
// For relay message, set redundant to 1
match <PeerRing as ChordStorage<_, 1>>::vnode_operate(&self.dht, msg.clone()).await {
Ok(action) => match action {
PeerRingAction::None => Ok(vec![]),
PeerRingAction::RemoteAction(next, _) => {
Ok(vec![MessageHandlerEvent::ResetDestination(
ctx.clone(),
next,
)])
}
act => Err(Error::PeerRingUnexpectedAction(act)),
},
Err(e) => Err(e),
}
let action =
<PeerRing as ChordStorage<_, 1>>::vnode_operate(&self.dht, msg.clone()).await?;
handle_storage_operate_act(ctx, &action).await
}
}

Expand Down
3 changes: 2 additions & 1 deletion core/src/swarm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl Swarm {
};

if let Some(t) = self.get_transport(did) {
tracing::info!("[Swarm::ConnectClosed] removing transport {:?}", uuid);
if t.id == uuid && self.remove_transport(did).is_some() {
tracing::info!("[Swarm::ConnectClosed] transport {:?} closed", uuid);
let payload = MessagePayload::new_send(
Expand Down Expand Up @@ -364,7 +365,7 @@ impl Swarm {
/// 2) remove from transport pool;
/// 3) close the transport connection;
pub async fn disconnect(&self, did: Did) -> Result<()> {
tracing::info!("disconnect {:?}", did);
tracing::info!("[disconnect] removing from DHT {:?}", did);
self.dht.remove(did)?;
if let Some((_address, trans)) = self.remove_transport(did) {
trans.close().await?
Expand Down

0 comments on commit 5efa2da

Please sign in to comment.