Skip to content

Commit

Permalink
Use MessageHandler in callback
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Mar 10, 2024
1 parent ba3b50a commit 2bd025e
Show file tree
Hide file tree
Showing 15 changed files with 481 additions and 823 deletions.
21 changes: 11 additions & 10 deletions crates/core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::dht::PeerRing;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Result;
use crate::message::handlers::MessageHandlerEvent;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorSend;
use crate::message::FindSuccessorThen;
Expand Down Expand Up @@ -93,11 +92,11 @@ impl Stabilization {
tracing::debug!("STABILIZATION notify_predecessor: {:?}", s);
let payload = MessagePayload::new_send(
msg.clone(),
self.swarm.session_sk(),
self.swarm.transport.session_sk(),
s,
self.swarm.did(),
)?;
self.swarm.send_payload(payload).await?;
self.swarm.transport.send_payload(payload).await?;
}
Ok(())
} else {
Expand All @@ -122,11 +121,11 @@ impl Stabilization {
});
let payload = MessagePayload::new_send(
msg.clone(),
self.swarm.session_sk(),
self.swarm.transport.session_sk(),
closest_predecessor,
closest_predecessor,
)?;
self.swarm.send_payload(payload).await?;
self.swarm.transport.send_payload(payload).await?;
Ok(())
}
_ => {
Expand All @@ -150,11 +149,13 @@ impl Stabilization {
PeerRingRemoteAction::QueryForSuccessorListAndPred,
) = self.chord.pre_stabilize()?
{
let evs = vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)),
next,
)];
return self.swarm.handle_message_handler_events(&evs).await;
self.swarm
.transport
.send_direct_message(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)),
next,
)
.await?;
}
Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use rings_transport::core::transport::ConnectionInterface;
use serde::Deserialize;
use serde::Serialize;

Expand Down
134 changes: 58 additions & 76 deletions crates/core/src/message/handlers/connection.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;

use super::dht;
use crate::dht::types::Chord;
use crate::dht::types::CorrectChord;
use crate::dht::PeerRingAction;
use crate::dht::TopoInfo;
use crate::error::Error;
Expand All @@ -17,126 +18,102 @@ use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorThen;
use crate::message::HandleMsg;
use crate::message::MessageHandler;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
use crate::message::PayloadSender;

/// QueryForTopoInfoSend is direct message
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<QueryForTopoInfoSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload,
msg: &QueryForTopoInfoSend,
) -> Result<Vec<MessageHandlerEvent>> {
let info: TopoInfo = TopoInfo::try_from(self.dht.deref())?;
async fn handle(&self, ctx: &MessagePayload, msg: &QueryForTopoInfoSend) -> Result<()> {
let info: TopoInfo = TopoInfo::try_from(self.dht.as_ref())?;
if msg.did == self.dht.did {
Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
Message::QueryForTopoInfoReport(msg.resp(info)),
)])
} else {
Ok(vec![])
self.transport
.send_report_message(ctx, Message::QueryForTopoInfoReport(msg.resp(info)))
.await?
}
Ok(())
}
}

/// Try join received node into DHT after received from TopoInfo.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<QueryForTopoInfoReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload,
msg: &QueryForTopoInfoReport,
) -> Result<Vec<MessageHandlerEvent>> {
async fn handle(&self, _ctx: &MessagePayload, msg: &QueryForTopoInfoReport) -> Result<()> {
match msg.then {
<QueryForTopoInfoReport as Then>::Then::SyncSuccessor => Ok(msg
.info
.successors
.iter()
.map(|did| MessageHandlerEvent::JoinDHT(ctx.clone(), *did))
.collect()),
<QueryForTopoInfoReport as Then>::Then::SyncSuccessor => {
for peer in msg.info.successors.iter() {
self.join_dht(*peer).await?;
}
}
<QueryForTopoInfoReport as Then>::Then::Stabilization => {
let ev = self.dht.stabilize(msg.info.clone())?;
dht::handle_dht_events(&ev, ctx).await
self.handle_dht_events(&ev).await?;
}
}
Ok(())
}
}

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<ConnectNodeSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload,
msg: &ConnectNodeSend,
) -> Result<Vec<MessageHandlerEvent>> {
async fn handle(&self, ctx: &MessagePayload, msg: &ConnectNodeSend) -> Result<()> {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)])
self.transport.forward_payload(ctx, None).await
} else {
Ok(vec![MessageHandlerEvent::AnswerOffer(
ctx.clone(),
msg.clone(),
)])
let answer = self
.transport
.answer_remote_connection(ctx.relay.origin_sender(), self.inner_callback(), msg)
.await?;
self.transport
.send_report_message(ctx, Message::ConnectNodeReport(answer))
.await
}
}
}

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<ConnectNodeReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload,
msg: &ConnectNodeReport,
) -> Result<Vec<MessageHandlerEvent>> {
async fn handle(&self, ctx: &MessagePayload, msg: &ConnectNodeReport) -> Result<()> {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)])
self.transport.forward_payload(ctx, None).await
} else {
Ok(vec![MessageHandlerEvent::AcceptAnswer(
ctx.relay.origin_sender(),
msg.clone(),
)])
self.transport
.accept_remote_connection(ctx.relay.origin_sender(), msg)
.await
}
}
}

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<FindSuccessorSend> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload,
msg: &FindSuccessorSend,
) -> Result<Vec<MessageHandlerEvent>> {
async fn handle(&self, ctx: &MessagePayload, msg: &FindSuccessorSend) -> Result<()> {
match self.dht.find_successor(msg.did)? {
PeerRingAction::Some(did) => {
if !msg.strict || self.dht.did == msg.did {
match &msg.then {
FindSuccessorThen::Report(handler) => {
Ok(vec![MessageHandlerEvent::SendReportMessage(
ctx.clone(),
Message::FindSuccessorReport(FindSuccessorReport {
did,
handler: handler.clone(),
}),
)])
self.transport
.send_report_message(
ctx,
Message::FindSuccessorReport(FindSuccessorReport {
did,
handler: handler.clone(),
}),
)
.await
}
}
} else {
Ok(vec![MessageHandlerEvent::ForwardPayload(
ctx.clone(),
Some(did),
)])
self.transport.forward_payload(ctx, Some(did)).await
}
}
PeerRingAction::RemoteAction(next, _) => {
Ok(vec![MessageHandlerEvent::ResetDestination(
ctx.clone(),
next,
)])
self.transport.reset_destination(ctx, next).await
}
act => Err(Error::PeerRingUnexpectedAction(act)),
}
Expand All @@ -146,22 +123,27 @@ impl HandleMsg<FindSuccessorSend> for MessageHandler {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<FindSuccessorReport> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload,
msg: &FindSuccessorReport,
) -> Result<Vec<MessageHandlerEvent>> {
async fn handle(&self, ctx: &MessagePayload, msg: &FindSuccessorReport) -> Result<()> {
if self.dht.did != ctx.relay.destination {
return Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)]);
return self.transport.forward_payload(ctx, None).await;
}

match &msg.handler {
FindSuccessorReportHandler::FixFingerTable => {
Ok(vec![MessageHandlerEvent::Connect(msg.did)])
FindSuccessorReportHandler::FixFingerTable | FindSuccessorReportHandler::Connect => {
if msg.did != self.dht.did {
let offer_msg = self
.transport
.prepare_connection_offer(msg.did, self.inner_callback())
.await?;
self.transport
.send_message(Message::ConnectNodeSend(offer_msg), msg.did)
.await?;
}
}
FindSuccessorReportHandler::Connect => Ok(vec![MessageHandlerEvent::Connect(msg.did)]),
_ => Ok(vec![]),
_ => {}
}

Ok(())
}
}

Expand Down
13 changes: 4 additions & 9 deletions crates/core/src/message/handlers/custom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,16 @@ use crate::error::Result;
use crate::message::types::CustomMessage;
use crate::message::HandleMsg;
use crate::message::MessageHandler;
use crate::message::MessageHandlerEvent;
use crate::message::MessagePayload;
use crate::message::PayloadSender;

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl HandleMsg<CustomMessage> for MessageHandler {
async fn handle(
&self,
ctx: &MessagePayload,
_: &CustomMessage,
) -> Result<Vec<MessageHandlerEvent>> {
async fn handle(&self, ctx: &MessagePayload, _: &CustomMessage) -> Result<()> {
if self.dht.did != ctx.relay.destination {
Ok(vec![MessageHandlerEvent::ForwardPayload(ctx.clone(), None)])
} else {
Ok(vec![])
self.transport.forward_payload(ctx, None).await?;
}
Ok(())
}
}
Loading

0 comments on commit 2bd025e

Please sign in to comment.