Skip to content

Commit

Permalink
some refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanKung committed Jul 6, 2023
1 parent 3e0683c commit 0a3fbfd
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 356 deletions.
1 change: 0 additions & 1 deletion core/src/dht/did.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
//! This is because what we actually require is the scalar multiplication of affine multiaction.
//! Did is represented as a wrapper of H160 (\[u8; 20\]). Since there is no `Eq` trait available for algebraic Rings, we have introduced the [BiasId]
//! struct to implement [Eq] and [PartialEq].
//!

use std::cmp::PartialEq;
use std::ops::Add;
Expand Down
5 changes: 2 additions & 3 deletions core/src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ pub mod finger;
pub mod successor;
pub use successor::SuccessorReader;
pub use successor::SuccessorWriter;
mod types;
pub mod types;
pub use chord::PeerRing;
pub use chord::PeerRingAction;
pub use chord::RemoteAction as PeerRingRemoteAction;
pub use finger::FingerTable;

pub use types::Chord;
pub use types::ChordStorage;
pub use types::ChordStorageCache;
pub use types::ChordStorageSync;
pub use types::LiveDid;
pub use types::CorrectChord;
pub use types::LiveDid;
mod stabilization;
pub use stabilization::Stabilization;
pub use stabilization::TStabilize;
Expand Down
174 changes: 91 additions & 83 deletions core/src/message/handlers/connection.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use async_trait::async_trait;
use std::ops::Deref;

use async_recursion::async_recursion;
use async_trait::async_trait;
use futures::future::join_all;

use crate::dht::successor::SuccessorWriter;
use crate::dht::Chord;
use crate::dht::ChordStorageSync;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::dht::TopoInfo;
use crate::error::Error;
use crate::error::Result;
use crate::message::types::ConnectNodeReport;
Expand All @@ -14,6 +18,8 @@ use crate::message::types::FindSuccessorReport;
use crate::message::types::FindSuccessorSend;
use crate::message::types::JoinDHT;
use crate::message::types::Message;
use crate::message::types::QueryForTopoInfoReport;
use crate::message::types::QueryForTopoInfoSend;
use crate::message::types::SyncVNodeWithSuccessor;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorThen;
Expand All @@ -22,66 +28,54 @@ use crate::message::LeaveDHT;
use crate::message::MessageHandler;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
use crate::message::types::QueryForTopoInfoReport;
use crate::message::types::QueryForTopoInfoSend;
use crate::dht::TopoInfo;
use std::ops::Deref;
use futures::future::join_all;


/// Handler of join dht event from DHT.
#[cfg_attr(feature = "wasm", async_recursion(?Send))]
#[cfg_attr(not(feature = "wasm"), async_recursion)]
async fn handle_join_dht(
handler: &MessageHandler,
act: PeerRingAction,
ctx: &MessagePayload<Message>,
) -> Result<Vec<MessageHandlerEvent>> {
pub async fn handle_join_dht(act: PeerRingAction) -> Result<Vec<MessageHandlerEvent>> {
match act {
PeerRingAction::None => Ok(vec![]),
// Ask next fo find successor for did,
// if there is only two nodes A, B, it may cause loop, for example
// A's successor is B, B ask A to find successor for B
// A may send message to it's successor, which is B
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::FindSuccessorForConnect(did)) => {
// if there is only two nodes A, B, it may cause recursion
// A.successor == B
// B.successor == A
// A.find_successor(B)
if next != ctx.addr {
Ok(vec![
MessageHandlerEvent::SendDirectMessage(
Message::FindSuccessorSend(FindSuccessorSend {
did,
strict: false,
then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect),
}),
next,
)
])
} else {
Ok(vec![])
}
},
if next != did {
Ok(vec![MessageHandlerEvent::SendDirectMessage(
Message::FindSuccessorSend(FindSuccessorSend {
did,
strict: false,
then: FindSuccessorThen::Report(FindSuccessorReportHandler::Connect),
}),
next,
)])
} else {
Ok(vec![])
}
}
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::QueryForSuccessorList) => {
Ok(vec![
MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend { did: next.clone() }),
next.clone()
)
])
Ok(vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend { did: next }),
next,
)])
}
PeerRingAction::MultiActions(acts) => {
let ret: Vec<MessageHandlerEvent> = join_all(acts.iter().map(|act| async {
handle_join_dht(handler, act.clone(), ctx).await
})).await.iter().filter(|x| x.is_ok())
.map(|x| x.as_ref().unwrap())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
Ok(ret)
let ret: Vec<MessageHandlerEvent> = join_all(
acts.iter()
.map(|act| async { handle_join_dht(act.clone()).await }),
)
.await
.iter()
.filter_map(|x| x.as_ref().ok())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
Ok(ret)
}
_ => unreachable!(),
}

}


/// When handling update successor, it may cause two situtation, and it may cause multiple situtation.
/// 1. DHT put a connected successor into successor list, and ask successor_list of it.
/// 2. DHT wana set a new successor into successor list, but it's not connected, thus it request to connect first.
Expand All @@ -91,30 +85,33 @@ pub async fn handle_update_successor(
handler: &MessageHandler,
act: &PeerRingAction,
ctx: &MessagePayload<Message>,
) -> Result<Vec<MessageHandlerEvent>> {
) -> Result<Vec<MessageHandlerEvent>> {
match act {
PeerRingAction::None => Ok(vec![]),
PeerRingAction::RemoteAction(next, PeerRingRemoteAction::QueryForSuccessorList) => {
Ok(vec![
MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend { did: next.clone() }),
next.clone()
)
])
Ok(vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend { did: *next }),
*next,
)])
}
PeerRingAction::MultiActions(acts) => {
let ret: Vec<MessageHandlerEvent> = join_all(
acts.iter()
.map(|act| async { handle_update_successor(handler, act, ctx).await })
).await.iter().filter(|x| x.is_ok())
.map(|x| x.as_ref().unwrap())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
let ret: Vec<MessageHandlerEvent> = join_all(
acts.iter()
.map(|act| async { handle_update_successor(handler, act, ctx).await }),
)
.await
.iter()
.filter_map(|x| x.as_ref().ok())
.flat_map(|xs| xs.iter())
.cloned()
.collect();
Ok(ret)
}
PeerRingAction::RemoteAction(did, PeerRingRemoteAction::TryConnect) => {
Ok(vec![MessageHandlerEvent::ConnectVia(did.clone(), ctx.relay.sender())])
Ok(vec![MessageHandlerEvent::ConnectVia(
*did,
ctx.relay.origin_sender(),
)])
}
_ => unreachable!(),
}
Expand All @@ -128,16 +125,16 @@ impl HandleMsg<QueryForTopoInfoSend> for MessageHandler {
&self,
ctx: &MessagePayload<Message>,
msg: &QueryForTopoInfoSend,
) -> Result<Vec<MessageHandlerEvent>> {
) -> Result<Vec<MessageHandlerEvent>> {
let info: TopoInfo = TopoInfo::try_from(self.dht.deref())?;
if msg.did == self.dht.did {
Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
Message::QueryForTopoInfoReport(QueryForTopoInfoReport { info }),
)])
} else {
Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
Message::QueryForTopoInfoReport(QueryForTopoInfoReport { info }),
)])
} else {
Ok(vec![])
}
}
}
}

Expand All @@ -149,19 +146,17 @@ impl HandleMsg<QueryForTopoInfoReport> for MessageHandler {
&self,
_ctx: &MessagePayload<Message>,
msg: &QueryForTopoInfoReport,
) -> Result<Vec<MessageHandlerEvent>> {
) -> Result<Vec<MessageHandlerEvent>> {
let evs: Vec<MessageHandlerEvent> = msg
.info
.successors
.iter()
.map(|did| MessageHandlerEvent::JoinDHT(did.clone()))
.map(|did| MessageHandlerEvent::JoinDHT(*did))
.collect();
Ok(evs)
Ok(evs)
}
}



#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<LeaveDHT> for MessageHandler {
Expand All @@ -179,14 +174,15 @@ impl HandleMsg<LeaveDHT> for MessageHandler {
impl HandleMsg<JoinDHT> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload<Message>,
_ctx: &MessagePayload<Message>,
msg: &JoinDHT,
) -> Result<Vec<MessageHandlerEvent>> {
// here is two situation.
// finger table just have no other node(beside next), it will be a `create` op
// otherwise, it will be a `send` op
let act = self.dht.join(msg.did)?;
handle_join_dht(&self, act, ctx).await
Ok(vec![MessageHandlerEvent::JoinDHT(msg.did)])
// let act = self.dht.join(msg.did)?;
// handle_join_dht(&self, act, ctx).await
}
}

Expand All @@ -201,7 +197,10 @@ impl HandleMsg<ConnectNodeSend> for MessageHandler {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)])
} else {
Ok(vec![MessageHandlerEvent::AnswerOffer(ctx.clone(), msg.clone())])
Ok(vec![MessageHandlerEvent::AnswerOffer(
ctx.clone(),
msg.clone(),
)])
}
}
}
Expand All @@ -217,7 +216,10 @@ impl HandleMsg<ConnectNodeReport> for MessageHandler {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)])
} else {
Ok(vec![MessageHandlerEvent::AcceptAnswer(ctx.relay.sender(), msg.clone())])
Ok(vec![MessageHandlerEvent::AcceptAnswer(
ctx.relay.origin_sender(),
msg.clone(),
)])
}
}
}
Expand All @@ -236,7 +238,7 @@ impl HandleMsg<FindSuccessorSend> for MessageHandler {
match &msg.then {
FindSuccessorThen::Report(handler) => {
Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
ctx.clone(),
Message::FindSuccessorReport(FindSuccessorReport {
did,
handler: handler.clone(),
Expand All @@ -245,11 +247,17 @@ impl HandleMsg<FindSuccessorSend> for MessageHandler {
}
}
} else {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), Some(did))])
Ok(vec![MessageHandlerEvent::ForwardPayload(
ctx.clone(),
Some(did),
)])
}
}
PeerRingAction::RemoteAction(next, _) => {
Ok(vec![MessageHandlerEvent::ResetDestination(ctx.clone(), next)])
Ok(vec![MessageHandlerEvent::ResetDestination(
ctx.clone(),
next,
)])
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
Expand Down
1 change: 1 addition & 0 deletions core/src/message/handlers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![warn(missing_docs)]
//! This module implemented message handler of rings network.
/// Message Flow:
/// +---------+ +--------------------------------+
/// | Message | -> | MessageHandler.handler_payload |
Expand Down
4 changes: 2 additions & 2 deletions core/src/message/handlers/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ impl HandleMsg<NotifyPredecessorSend> for MessageHandler {
self.dht.notify(msg.did)?;

if let Some(did) = predecessor {
if did != ctx.relay.sender() {
if did != ctx.relay.origin_sender() {
return Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
ctx.clone(),
Message::NotifyPredecessorReport(NotifyPredecessorReport { did }),
)]);
}
Expand Down
12 changes: 9 additions & 3 deletions core/src/message/handlers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,11 +161,14 @@ impl HandleMsg<SearchVNode> for MessageHandler {
Ok(action) => match action {
PeerRingAction::None => Ok(vec![]),
PeerRingAction::SomeVNode(v) => Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
ctx.clone(),
Message::FoundVNode(FoundVNode { data: vec![v] }),
)]),
PeerRingAction::RemoteAction(next, _) => {
Ok(vec![MessageHandlerEvent::ResetDestination(ctx.clone(), next)])
Ok(vec![MessageHandlerEvent::ResetDestination(
ctx.clone(),
next,
)])
}
act => Err(Error::PeerRingUnexpectedAction(act)),
},
Expand Down Expand Up @@ -205,7 +208,10 @@ impl HandleMsg<VNodeOperation> for MessageHandler {
Ok(action) => match action {
PeerRingAction::None => Ok(vec![]),
PeerRingAction::RemoteAction(next, _) => {
Ok(vec![MessageHandlerEvent::ResetDestination(ctx.clone(), next)])
Ok(vec![MessageHandlerEvent::ResetDestination(
ctx.clone(),
next,
)])
}
act => Err(Error::PeerRingUnexpectedAction(act)),
},
Expand Down
2 changes: 1 addition & 1 deletion core/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub use payload::PayloadSender;
mod types;
pub use types::*;

mod handlers;
pub mod handlers;
pub use handlers::storage::ChordStorageInterface;
pub use handlers::storage::ChordStorageInterfaceCacheChecker;
pub use handlers::subring::SubringInterface;
Expand Down
Loading

0 comments on commit 0a3fbfd

Please sign in to comment.