Skip to content

Commit

Permalink
apply to stab
Browse files Browse the repository at this point in the history
  • Loading branch information
RyanKung committed Jul 11, 2023
1 parent 93ccca8 commit 4b835ff
Showing 1 changed file with 49 additions and 14 deletions.
63 changes: 49 additions & 14 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ use std::sync::Arc;
use async_trait::async_trait;

use crate::dht::successor::SuccessorReader;
use crate::dht::types::CorrectChord;
use crate::dht::Chord;
use crate::dht::PeerRing;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Result;
use crate::message::handlers::MessageHandlerEvent;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorSend;
use crate::message::FindSuccessorThen;
use crate::message::Message;
use crate::message::MessagePayload;
use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;
Expand All @@ -39,20 +42,6 @@ pub trait TStabilize {
}

impl Stabilization {
/// Create a new instance of Stabilization
pub fn new(swarm: Arc<Swarm>, timeout: usize) -> Self {
Self {
chord: swarm.dht(),
swarm,
timeout,
}
}

/// Get timeout of waiting delays.
pub fn get_timeout(&self) -> usize {
self.timeout
}

/// Clean unavailable transports from swarm.
pub async fn clean_unavailable_transports(&self) -> Result<()> {
let transports = self.swarm.get_transports();
Expand All @@ -66,7 +55,25 @@ impl Stabilization {

Ok(())
}
}

impl Stabilization {
/// Create a new instance of Stabilization
pub fn new(swarm: Arc<Swarm>, timeout: usize) -> Self {
Self {
chord: swarm.dht(),
swarm,
timeout,
}
}

/// Get timeout of waiting delays.
pub fn get_timeout(&self) -> usize {
self.timeout
}
}

impl Stabilization {
/// Notify predecessor, this is a DHT operation.
pub async fn notify_predecessor(&self) -> Result<()> {
let (successor_min, successor_list) = {
Expand Down Expand Up @@ -129,7 +136,27 @@ impl Stabilization {
}
}
}
}

impl Stabilization {
/// Call stabilization from correct chord implementation
pub async fn correct_stabilize(&self) -> Result<()> {
if let PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::QueryForSuccessorListAndPred,
) = self.chord.pre_stabilize()?
{
let evs = vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)),
next,
)];
return self.swarm.handle_message_handler_events(&evs).await;
}
Ok(())
}
}

impl Stabilization {
/// Call stabilize periodly.
pub async fn stabilize(&self) -> Result<()> {
tracing::debug!("STABILIZATION notify_predecessor start");
Expand All @@ -147,6 +174,14 @@ impl Stabilization {
tracing::error!("[stabilize] Failed on clean unavailable transports {:?}", e);
}
tracing::debug!("STABILIZATION clean_unavailable_transports end");
#[cfg(feature = "experimental")]
{
tracing::debug!("STABILIZATION correct_stabilize start");
if let Err(e) = self.correct_stabilize() {
tracing::error!("[stabilize] Failed on call correct stabilize {:?}", e);
}
tracing::debug!("STABILIZATION correct_stabilize end");
}
Ok(())
}
}
Expand Down

0 comments on commit 4b835ff

Please sign in to comment.