Skip to content

Commit

Permalink
Upgrade stabilization api
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Mar 26, 2024
1 parent 7dee3b0 commit cb9e035
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 175 deletions.
3 changes: 1 addition & 2 deletions crates/core/src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ pub use chord::TopoInfo;
pub use chord::VNodeStorage;
pub use did::Did;
pub use finger::FingerTable;
pub use stabilization::Stabilization;
pub use stabilization::TStabilize;
pub use stabilization::Stabilizer;
pub use successor::SuccessorReader;
pub use successor::SuccessorWriter;
pub use types::Chord;
Expand Down
170 changes: 68 additions & 102 deletions crates/core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Stabilization wait to notify predecessors and update fingersTable.
//! Stabilization run daemons to maintain dht.
use std::sync::Arc;

use async_trait::async_trait;
use rings_transport::core::transport::WebrtcConnectionState;

use crate::dht::successor::SuccessorReader;
Expand All @@ -19,30 +19,56 @@ use crate::message::MessagePayload;
use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::swarm::Swarm;
use crate::transport::SwarmTransport;

/// A combination contains chord and swarm, use to run stabilize.
/// - swarm: transports communicate with each others.
/// - chord: fix local fingers table.
/// The stabilization runner.
#[derive(Clone)]
pub struct Stabilization {
chord: Arc<PeerRing>,
swarm: Arc<Swarm>,
timeout: u64,
pub struct Stabilizer {
transport: Arc<SwarmTransport>,
dht: Arc<PeerRing>,
}

/// A trait with `wait` method.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait TStabilize {
/// Wait and poll
async fn wait(self: Arc<Self>);
}
impl Stabilizer {
/// Create a new stabilization runner.
pub fn new(transport: Arc<SwarmTransport>) -> Self {
let dht = transport.dht.clone();
Self { transport, dht }
}

/// Run stabilization once.
pub async fn stabilize(&self) -> Result<()> {
tracing::debug!("STABILIZATION notify_predecessor start");
if let Err(e) = self.notify_predecessor().await {
tracing::error!("[stabilize] Failed on notify predecessor {:?}", e);
}
tracing::debug!("STABILIZATION notify_predecessor end");
tracing::debug!("STABILIZATION fix_fingers start");
if let Err(e) = self.fix_fingers().await {
tracing::error!("[stabilize] Failed on fix_finger {:?}", e);
}
tracing::debug!("STABILIZATION fix_fingers end");
tracing::debug!("STABILIZATION clean_unavailable_connections start");
if let Err(e) = self.clean_unavailable_connections().await {
tracing::error!(
"[stabilize] Failed on clean unavailable connections {:?}",
e
);
}
tracing::debug!("STABILIZATION clean_unavailable_connections end");
#[cfg(feature = "experimental")]
{
tracing::debug!("STABILIZATION correct_stabilize start");
if let Err(e) = self.correct_stabilize().await {
tracing::error!("[stabilize] Failed on call correct stabilize {:?}", e);
}
tracing::debug!("STABILIZATION correct_stabilize end");
}
Ok(())
}

impl Stabilization {
/// Clean unavailable connections in transport.
pub async fn clean_unavailable_connections(&self) -> Result<()> {
let conns = self.swarm.transport.get_connections();
let conns = self.transport.get_connections();

for (did, conn) in conns.into_iter() {
if matches!(
Expand All @@ -52,51 +78,31 @@ impl Stabilization {
| WebrtcConnectionState::Closed
) {
tracing::info!("STABILIZATION clean_unavailable_transports: {:?}", did);
self.swarm.disconnect(did).await?;
self.transport.disconnect(did).await?;
}
}

Ok(())
}
}

impl Stabilization {
/// Create a new instance of Stabilization
pub fn new(swarm: Arc<Swarm>, timeout: u64) -> Self {
Self {
chord: swarm.dht(),
swarm,
timeout,
}
}

/// Get timeout of waiting delays.
pub fn get_timeout(&self) -> u64 {
self.timeout
}
}

impl Stabilization {
/// Notify predecessor, this is a DHT operation.
pub async fn notify_predecessor(&self) -> Result<()> {
let (successor_min, successor_list) = {
let successor = self.chord.successors();
let successor = self.dht.successors();
(successor.min()?, successor.list()?)
};

let msg = Message::NotifyPredecessorSend(NotifyPredecessorSend {
did: self.chord.did,
});
if self.chord.did != successor_min {
let msg = Message::NotifyPredecessorSend(NotifyPredecessorSend { did: self.dht.did });
if self.dht.did != successor_min {
for s in successor_list {
tracing::debug!("STABILIZATION notify_predecessor: {:?}", s);
let payload = MessagePayload::new_send(
msg.clone(),
self.swarm.transport.session_sk(),
self.transport.session_sk(),
s,
self.swarm.did(),
self.dht.did,
)?;
self.swarm.transport.send_payload(payload).await?;
self.transport.send_payload(payload).await?;
}
Ok(())
} else {
Expand All @@ -106,7 +112,7 @@ impl Stabilization {

/// Fix fingers from finger table, this is a DHT operation.
async fn fix_fingers(&self) -> Result<()> {
match self.chord.fix_fingers() {
match self.dht.fix_fingers() {
Ok(action) => match action {
PeerRingAction::None => Ok(()),
PeerRingAction::RemoteAction(
Expand All @@ -121,11 +127,11 @@ impl Stabilization {
});
let payload = MessagePayload::new_send(
msg.clone(),
self.swarm.transport.session_sk(),
self.transport.session_sk(),
closest_predecessor,
closest_predecessor,
)?;
self.swarm.transport.send_payload(payload).await?;
self.transport.send_payload(payload).await?;
Ok(())
}
_ => {
Expand All @@ -139,18 +145,15 @@ impl Stabilization {
}
}
}
}

impl Stabilization {
/// Call stabilization from correct chord implementation
pub async fn correct_stabilize(&self) -> Result<()> {
if let PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::QueryForSuccessorListAndPred,
) = self.chord.pre_stabilize()?
) = self.dht.pre_stabilize()?
{
self.swarm
.transport
self.transport
.send_direct_message(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)),
next,
Expand All @@ -161,58 +164,23 @@ impl Stabilization {
}
}

impl Stabilization {
/// Call stabilize periodly.
pub async fn stabilize(&self) -> Result<()> {
tracing::debug!("STABILIZATION notify_predecessor start");
if let Err(e) = self.notify_predecessor().await {
tracing::error!("[stabilize] Failed on notify predecessor {:?}", e);
}
tracing::debug!("STABILIZATION notify_predecessor end");
tracing::debug!("STABILIZATION fix_fingers start");
if let Err(e) = self.fix_fingers().await {
tracing::error!("[stabilize] Failed on fix_finger {:?}", e);
}
tracing::debug!("STABILIZATION fix_fingers end");
tracing::debug!("STABILIZATION clean_unavailable_connections start");
if let Err(e) = self.clean_unavailable_connections().await {
tracing::error!(
"[stabilize] Failed on clean unavailable connections {:?}",
e
);
}
tracing::debug!("STABILIZATION clean_unavailable_connections end");
#[cfg(feature = "experimental")]
{
tracing::debug!("STABILIZATION correct_stabilize start");
if let Err(e) = self.correct_stabilize().await {
tracing::error!("[stabilize] Failed on call correct stabilize {:?}", e);
}
tracing::debug!("STABILIZATION correct_stabilize end");
}
Ok(())
}
}

#[cfg(not(feature = "wasm"))]
mod stabilizer {
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::future::FutureExt;
use futures::pin_mut;
use futures::select;
use futures_timer::Delay;

use super::Stabilization;
use super::TStabilize;
use super::*;

#[async_trait]
impl TStabilize for Stabilization {
async fn wait(self: Arc<Self>) {
impl Stabilizer {
/// Run stabilization in a loop.
pub async fn wait(self: Arc<Self>, interval: Duration) {
loop {
let timeout = Delay::new(Duration::from_secs(self.timeout)).fuse();
let timeout = Delay::new(interval).fuse();
pin_mut!(timeout);
select! {
_ = timeout => self
Expand All @@ -228,19 +196,17 @@ mod stabilizer {
#[cfg(feature = "wasm")]
mod stabilizer {
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use wasm_bindgen_futures::spawn_local;

use super::Stabilization;
use super::TStabilize;
use super::*;
use crate::poll;

#[async_trait(?Send)]
impl TStabilize for Stabilization {
async fn wait(self: Arc<Self>) {
impl Stabilizer {
/// Run stabilization in a loop.
pub async fn wait(self: Arc<Self>, interval: Duration) {
let caller = Arc::clone(&self);
let timeout = caller.timeout;
let func = move || {
let caller = caller.clone();
spawn_local(Box::pin(async move {
Expand All @@ -250,7 +216,7 @@ mod stabilizer {
.unwrap_or_else(|e| tracing::error!("failed to stabilize {:?}", e));
}))
};
poll!(func, (timeout * 1000).try_into().unwrap());
poll!(func, interval.as_millis().try_into().unwrap());
}
}
}
35 changes: 17 additions & 18 deletions crates/core/src/message/handlers/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ mod test {

use super::*;
use crate::dht::successor::SuccessorReader;
use crate::dht::Stabilization;
use crate::ecc::tests::gen_ordered_keys;
use crate::ecc::SecretKey;
use crate::swarm::Swarm;
Expand Down Expand Up @@ -158,9 +157,9 @@ mod test {
println!("|| now we start first stabilization ||");
println!("========================================");

run_stabilize_once(node1.swarm.clone()).await?;
run_stabilize_once(node2.swarm.clone()).await?;
run_stabilize_once(node3.swarm.clone()).await?;
run_stabilization_once(node1.swarm.clone()).await?;
run_stabilization_once(node2.swarm.clone()).await?;
run_stabilization_once(node3.swarm.clone()).await?;

wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;
Expand Down Expand Up @@ -192,9 +191,9 @@ mod test {
println!("|| now we start second stabilization ||");
println!("=========================================");

run_stabilize_once(node1.swarm.clone()).await?;
run_stabilize_once(node2.swarm.clone()).await?;
run_stabilize_once(node3.swarm.clone()).await?;
run_stabilization_once(node1.swarm.clone()).await?;
run_stabilization_once(node2.swarm.clone()).await?;
run_stabilization_once(node3.swarm.clone()).await?;

wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;
Expand Down Expand Up @@ -264,9 +263,9 @@ mod test {
println!("|| now we start first stabilization ||");
println!("========================================");

run_stabilize_once(node1.swarm.clone()).await?;
run_stabilize_once(node2.swarm.clone()).await?;
run_stabilize_once(node3.swarm.clone()).await?;
run_stabilization_once(node1.swarm.clone()).await?;
run_stabilization_once(node2.swarm.clone()).await?;
run_stabilization_once(node3.swarm.clone()).await?;

wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;
Expand All @@ -292,9 +291,9 @@ mod test {
println!("|| now we start second stabilization ||");
println!("=========================================");

run_stabilize_once(node1.swarm.clone()).await?;
run_stabilize_once(node2.swarm.clone()).await?;
run_stabilize_once(node3.swarm.clone()).await?;
run_stabilization_once(node1.swarm.clone()).await?;
run_stabilization_once(node2.swarm.clone()).await?;
run_stabilization_once(node3.swarm.clone()).await?;

wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;
Expand Down Expand Up @@ -326,9 +325,9 @@ mod test {
println!("|| now we start third stabilization ||");
println!("=========================================");

run_stabilize_once(node1.swarm.clone()).await?;
run_stabilize_once(node2.swarm.clone()).await?;
run_stabilize_once(node3.swarm.clone()).await?;
run_stabilization_once(node1.swarm.clone()).await?;
run_stabilization_once(node2.swarm.clone()).await?;
run_stabilization_once(node3.swarm.clone()).await?;

wait_for_msgs(&node1, &node2, &node3).await;
assert_no_more_msg(&node1, &node2, &node3).await;
Expand Down Expand Up @@ -359,8 +358,8 @@ mod test {
Ok(())
}

async fn run_stabilize_once(swarm: Arc<Swarm>) -> Result<()> {
let stab = Stabilization::new(swarm, 5);
async fn run_stabilization_once(swarm: Arc<Swarm>) -> Result<()> {
let stab = swarm.stabilizer();
stab.notify_predecessor().await
}
}
6 changes: 6 additions & 0 deletions crates/core/src/swarm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub use builder::SwarmBuilder;
use self::callback::InnerSwarmCallback;
use crate::dht::Did;
use crate::dht::PeerRing;
use crate::dht::Stabilizer;
use crate::error::Error;
use crate::error::Result;
use crate::inspect::SwarmInspect;
Expand Down Expand Up @@ -72,6 +73,11 @@ impl Swarm {
Ok(())
}

/// Create [Stabilizer] for swarm.
pub fn stabilizer(&self) -> Stabilizer {
Stabilizer::new(self.transport.clone())
}

/// Disconnect a connection. There are three steps:
/// 1) remove from DHT;
/// 2) remove from Transport;
Expand Down
Loading

0 comments on commit cb9e035

Please sign in to comment.