Skip to content

Commit

Permalink
feat: implementation of correct chord (#448)
Browse files Browse the repository at this point in the history
[Feat]: New feature flag: experimental, mark an implementation as experimental, which means that:1) It may not have been thoroughly tested. 2)The corresponding API may be deprecated or adjusted in the future.
[Doc]: Added document for modules.
[Refactor]: Made MessageHandlerEvent context-agnostic.
[Refactor]: Convergent implementation of logging module for wasm and defaut.
Convergent implementation of WASM and NATIVE #453
[Refactor]: Rename relay.sender() to relay.origin_sender()
[Refactor]: Reimplemented node::error::Error::code with #[repr(u32)]
[Fixed] datachannel for wasm and default
[Feat] Implemented messagehandler for correct chord related message.
Correctness of DHT #388 [BUG] DHT Join may not work properly #387
  • Loading branch information
RyanKung authored Jul 14, 2023
1 parent ee5ab25 commit d1b9d1e
Show file tree
Hide file tree
Showing 34 changed files with 1,315 additions and 656 deletions.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
fmt:
cargo +nightly fmt -p rings-core
cargo +nightly fmt -p rings-node
cargo +nightly fmt --all
# require taplo_cli, which can be install with
# cargo install taplo-cli
taplo format
# require typos_cli, which can be install with
# cargo install typos-cli
typos --write-changes

clippy-fix:
cargo clippy --fix --allow-dirty --no-deps
Expand Down
4 changes: 4 additions & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ keywords = ["Chord", "DHT", "Web3", "P2P", "WASM"]
categories = ["network-programming", "cryptography", "wasm"]

[features]
# Feature "experimental" is used to mark an implementation as experimental, which means that:
# It may not have been thoroughly tested.
# The corresponding API may be deprecated or adjusted in the future.
experimental = []
default = ["std"]
std = [
"webrtc",
Expand Down
2 changes: 1 addition & 1 deletion core/src/channels/default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ where T: std::fmt::Debug
tracing::debug!("channel received message: {:?}", v);
Ok(Some(v))
}
Err(_) => Err(Error::ChannelRecvMessageFailed),
Err(e) => Err(Error::ChannelRecvMessageFailed(e.to_string())),
}
}
}
4 changes: 3 additions & 1 deletion core/src/channels/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ impl<T: Send> Channel<T> for CbChannel<T> {
async fn recv(receiver: &Self::Receiver) -> Result<Option<T>> {
let mut receiver = receiver.lock().await;
match receiver.try_next() {
Err(_) => Err(Error::ChannelRecvMessageFailed),
// when there are no messages available, but channel is not yet closed
Err(_) => Ok(None),
// when message is fetched
Ok(Some(x)) => Ok(Some(x)),
// when channel is closed and no messages left in the queue
Ok(None) => Ok(None),
Expand Down
5 changes: 4 additions & 1 deletion core/src/dht/chord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ pub struct PeerRing {
pub cache: Arc<MemStorage<Did, VirtualNode>>,
}

/// Type alias is just for making the code easy to read.
type Target = Did;

/// `PeerRing` use this to describe the result of [Chord] algorithm. Sometimes it's a
/// direct result, sometimes it's an action that is continued externally.
#[derive(Clone, Debug, PartialEq)]
Expand All @@ -64,7 +67,7 @@ pub enum PeerRingAction {
/// Found some node.
Some(Did),
/// Trigger a remote action.
RemoteAction(Did, RemoteAction),
RemoteAction(Target, RemoteAction),
/// Trigger multiple remote actions.
MultiActions(Vec<PeerRingAction>),
}
Expand Down
59 changes: 53 additions & 6 deletions core/src/dht/did.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,38 @@
use std::cmp::Eq;
#![warn(missing_docs)]

//! This module defines distributed idendity for rings network.
//! The Did of rings network is also a abstract Ring structure of abstract algebra.
//! The Did is implemented with H160, which is a 160 bits, which can present as:
//!
//! ## Algebraic Did
//! In abstract algebra, a "ring" is an algebraic structure consisting of a non-empty set
//! and two binary operations (commonly referred to as addition and multiplication).
//! The definition of a ring can be stated as follows:
//!
//! * Addition is closed: For any two elements a and b in R, their sum a + b belongs to R.
//!
//! * Addition is commutative: For any two elements a and b in R, a + b = b + a.
//!
//! * Addition is associative: For any three elements a, b, and c in R, (a + b) + c = a + (b + c).
//!
//! * Existence of an additive identity: There exists an element 0 in R such that for any element a in R, a + 0 = 0 + a = a.
//!
//! * Existence of additive inverses: For every element a in R, there exists an element -a such that a + (-a) = (-a) + a = 0.
//!
//! * Multiplication is closed: For any two elements a and b in R, their product a · b belongs to R.
//!
//! * Multiplication satisfies the distributive law:
//! For any three elements a, b, and c in R, a · (b + c) = a · b + a · c and (a + b) · c = a · c + b · c.
//!
//! ## Concrete Did
//!
//! In our implementation, we have essentially implemented a cyclic Ring structure.
//! As a result, we can utilize rotation operations as a substitute for multiplication.
//! Within this module, we have implemented the additive operation for Did, as well as the rotate operation in place of multiaction.
//! This is because what we actually require is the scalar multiplication of affine multiaction.
//! Did is represented as a wrapper of H160 (\[u8; 20\]). Since there is no `Eq` trait available for algebraic Rings, we have introduced the [BiasId]
//! struct to implement [Eq] and [PartialEq].

use std::cmp::PartialEq;
use std::ops::Add;
use std::ops::Deref;
Expand Down Expand Up @@ -26,18 +60,25 @@ impl std::fmt::Display for Did {
}
}

// Bias Did is a special Did which set origin Did's idendity to bias
/// Bias Did is a special Did which set origin Did's idendity to bias
/// The underlying concept of BiasId is that while we cannot determine the order between two Dids, such as `did_a` and `did_b`,
/// we can establish a reference Did, referred to as `did_x`, and compare which one is closer to it. Hence, we introduced BiasId,
/// where a bias value is applied. Essentially, it considers the midpoint `x` as the zero point within the Ring algebraic structure for observation.
#[derive(Copy, Clone, Eq, PartialEq, Debug, Serialize, Deserialize, Hash)]
pub struct BiasId {
/// the zero point for determine order of Did.
bias: Did,
/// did data without bias.
did: Did,
}

/// The `Rotate` trait represents a affine transformation for values
/// in a finite ring. It defines a method `rotate` which allows applying
/// the transformation to the implementing type.
pub trait Rotate<Rhs = u16> {
/// output type of rotate operation
type Output;
/// rotate a Did with given angle
fn rotate(&self, angle: Rhs) -> Self::Output;
}

Expand All @@ -50,17 +91,20 @@ impl Rotate<u16> for Did {
}

impl BiasId {
/// Wrap a Did into BiasDid with given bias.
pub fn new(bias: Did, did: Did) -> BiasId {
BiasId {
bias,
did: did - bias,
}
}

/// Get wrapped biased value from did
pub fn to_did(self) -> Did {
self.did + self.bias
}

/// Get unwrap value from a BiasDid
pub fn pos(&self) -> Did {
self.did
}
Expand Down Expand Up @@ -123,26 +167,29 @@ impl TryFrom<HashStr> for Did {
}

impl Did {
// Test x <- (a, b)
/// Test x <- (a, b)
pub fn in_range(&self, base_id: Self, a: Self, b: Self) -> bool {
// Test x > a && b > x
*self - base_id > a - base_id && b - base_id > *self - base_id
}

// Transform Did to BiasDid
/// Transform Did to BiasDid
pub fn bias(&self, did: Self) -> BiasId {
BiasId::new(did, *self)
}

// Rotate Transport did to a list of affined did
// affine x, n = [x + rotate(360/n)]
/// Rotate Transport did to a list of affined did
/// affine x, n = [x + rotate(360/n)]
pub fn rotate_affine(&self, scalar: u16) -> Vec<Did> {
let angle = 360 / scalar;
(0..scalar).map(|i| (*self).rotate(i * angle)).collect()
}
}

/// Ordering with a did reference
/// This trait defines necessary method for sorting based on did.
pub trait SortRing {
/// Sort a impl SortRing with given did
fn sort(&mut self, did: Did);
}

Expand Down
5 changes: 4 additions & 1 deletion core/src/dht/mod.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
#![warn(missing_docs)]
//! Implementation of Ring's DHT
//!
//! which is based on CHORD, ref: <https://pdos.csail.mit.edu/papers/ton:chord/paper-ton.pdf>
//! With high probability, the number of nodes that must be contacted to find a successor in an N-node network is O(log N).
pub mod did;
pub use did::Did;
mod chord;
pub use chord::TopoInfo;
/// Finger table for Rings
pub mod finger;
pub mod successor;
pub use successor::SuccessorReader;
pub use successor::SuccessorWriter;
mod types;
pub mod types;
pub use chord::PeerRing;
pub use chord::PeerRingAction;
pub use chord::RemoteAction as PeerRingRemoteAction;
Expand All @@ -19,6 +21,7 @@ pub use types::Chord;
pub use types::ChordStorage;
pub use types::ChordStorageCache;
pub use types::ChordStorageSync;
pub use types::CorrectChord;
pub use types::LiveDid;
mod stabilization;
pub use stabilization::Stabilization;
Expand Down
66 changes: 54 additions & 12 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ use std::sync::Arc;
use async_trait::async_trait;

use crate::dht::successor::SuccessorReader;
use crate::dht::types::CorrectChord;
use crate::dht::Chord;
use crate::dht::PeerRing;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Result;
use crate::message::handlers::MessageHandlerEvent;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorSend;
use crate::message::FindSuccessorThen;
use crate::message::Message;
use crate::message::MessagePayload;
use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;
Expand All @@ -34,22 +37,12 @@ pub struct Stabilization {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait TStabilize {
/// Wait and poll
async fn wait(self: Arc<Self>);
}

impl Stabilization {
pub fn new(swarm: Arc<Swarm>, timeout: usize) -> Self {
Self {
chord: swarm.dht(),
swarm,
timeout,
}
}

pub fn get_timeout(&self) -> usize {
self.timeout
}

/// Clean unavailable transports from swarm.
pub async fn clean_unavailable_transports(&self) -> Result<()> {
let transports = self.swarm.get_transports();

Expand All @@ -62,7 +55,26 @@ impl Stabilization {

Ok(())
}
}

impl Stabilization {
/// Create a new instance of Stabilization
pub fn new(swarm: Arc<Swarm>, timeout: usize) -> Self {
Self {
chord: swarm.dht(),
swarm,
timeout,
}
}

/// Get timeout of waiting delays.
pub fn get_timeout(&self) -> usize {
self.timeout
}
}

impl Stabilization {
/// Notify predecessor, this is a DHT operation.
pub async fn notify_predecessor(&self) -> Result<()> {
let (successor_min, successor_list) = {
let successor = self.chord.successors();
Expand All @@ -89,6 +101,7 @@ impl Stabilization {
}
}

/// Fix fingers from finger table, this is a DHT operation.
async fn fix_fingers(&self) -> Result<()> {
match self.chord.fix_fingers() {
Ok(action) => match action {
Expand Down Expand Up @@ -123,7 +136,28 @@ impl Stabilization {
}
}
}
}

impl Stabilization {
/// Call stabilization from correct chord implementation
pub async fn correct_stabilize(&self) -> Result<()> {
if let PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::QueryForSuccessorListAndPred,
) = self.chord.pre_stabilize()?
{
let evs = vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)),
next,
)];
return self.swarm.handle_message_handler_events(&evs).await;
}
Ok(())
}
}

impl Stabilization {
/// Call stabilize periodly.
pub async fn stabilize(&self) -> Result<()> {
tracing::debug!("STABILIZATION notify_predecessor start");
if let Err(e) = self.notify_predecessor().await {
Expand All @@ -140,6 +174,14 @@ impl Stabilization {
tracing::error!("[stabilize] Failed on clean unavailable transports {:?}", e);
}
tracing::debug!("STABILIZATION clean_unavailable_transports end");
#[cfg(feature = "experimental")]
{
tracing::debug!("STABILIZATION correct_stabilize start");
if let Err(e) = self.correct_stabilize() {
tracing::error!("[stabilize] Failed on call correct stabilize {:?}", e);
}
tracing::debug!("STABILIZATION correct_stabilize end");
}
Ok(())
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ pub enum Error {
#[error("Send message through channel failed")]
ChannelSendMessageFailed,

#[error("Recv message through channel failed")]
ChannelRecvMessageFailed,
#[error("Recv message through channel failed {0}")]
ChannelRecvMessageFailed(String),

#[error("Invalid PeerRingAction")]
PeerRingInvalidAction,
Expand Down
Loading

0 comments on commit d1b9d1e

Please sign in to comment.