Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Correct Chord :: Message handler and Testing #405

Closed
wants to merge 16 commits into from
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ categories = ["network-programming", "cryptography", "wasm"]

[features]
default = ["std"]
experimental = []
redundant = []
std = [
"webrtc",
Expand Down
181 changes: 117 additions & 64 deletions core/src/dht/chord.rs

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion core/src/dht/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,23 @@ mod chord;
/// Finger table for Rings
pub mod finger;
pub mod successor;
pub use successor::SuccessorReader;
pub use successor::SuccessorWriter;
mod types;
pub use chord::PeerRing;
pub use chord::PeerRingAction;
pub use chord::RemoteAction as PeerRingRemoteAction;
pub use chord::TopoInfo;
pub use finger::FingerTable;
pub use types::Chord;
pub use types::ChordStorage;
pub use types::CorrectChord;
pub use types::LiveDid;
mod stabilization;
pub use stabilization::Stabilization;
pub use stabilization::TStabilize;
pub use stabilization::TStabilizeExecute;
pub use stabilization::TStabilizeWait;

/// Implement Subring with VNode
pub mod subring;
/// VNode is a special node that only has virtual address
Expand Down
30 changes: 22 additions & 8 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::dht::Chord;
use crate::dht::PeerRing;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::dht::SuccessorReader;
use crate::err::Result;
use crate::message::FindSuccessorReportHandler;
use crate::message::FindSuccessorSend;
Expand All @@ -29,10 +30,17 @@ pub struct Stabilization {
timeout: usize,
}

/// A trait with `stabilize` method.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait TStabilizeExecute {
async fn stabilize(&self) -> Result<()>;
}

/// A trait with `wait` method.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait TStabilize {
pub trait TStabilizeWait: TStabilizeExecute {
async fn wait(self: Arc<Self>);
}

Expand Down Expand Up @@ -64,8 +72,8 @@ impl Stabilization {

pub async fn notify_predecessor(&self) -> Result<()> {
let (successor_min, successor_list) = {
let successor = self.chord.lock_successor()?;
(successor.min(), successor.list())
let successor = self.chord.successors();
(successor.min()?, successor.list()?)
};

let msg = Message::NotifyPredecessorSend(NotifyPredecessorSend {
Expand Down Expand Up @@ -122,8 +130,12 @@ impl Stabilization {
}
}
}
}

pub async fn stabilize(&self) -> Result<()> {
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl TStabilizeExecute for Stabilization {
async fn stabilize(&self) -> Result<()> {
tracing::debug!("STABILIZATION notify_predecessor start");
if let Err(e) = self.notify_predecessor().await {
tracing::error!("[stabilize] Failed on notify predecessor {:?}", e);
Expand Down Expand Up @@ -155,10 +167,11 @@ mod stabilizer {
use futures_timer::Delay;

use super::Stabilization;
use super::TStabilize;
use super::TStabilizeExecute;
use super::TStabilizeWait;

#[async_trait]
impl TStabilize for Stabilization {
impl TStabilizeWait for Stabilization {
async fn wait(self: Arc<Self>) {
loop {
let timeout = Delay::new(Duration::from_secs(self.timeout as u64)).fuse();
Expand All @@ -182,11 +195,12 @@ mod stabilizer {
use wasm_bindgen_futures::spawn_local;

use super::Stabilization;
use super::TStabilize;
use super::TStabilizeExecute;
use super::TStabilizeWait;
use crate::poll;

#[async_trait(?Send)]
impl TStabilize for Stabilization {
impl TStabilizeWait for Stabilization {
async fn wait(self: Arc<Self>) {
let caller = Arc::clone(&self);
let func = move || {
Expand Down
196 changes: 148 additions & 48 deletions core/src/dht/successor.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
//! Successor for PeerRing
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::RwLockReadGuard;

use crate::dht::did::BiasId;
use crate::dht::did::SortRing;
use crate::dht::Did;
use crate::err::Error;
use crate::err::Result;

/// A sequence of successors for a node on the ring.
/// It's necessary to have multiple successors to prevent a single point of failure.
Expand All @@ -13,63 +20,155 @@ pub struct SuccessorSeq {
/// Max successor num
max: u8,
/// Successors
successors: Vec<Did>,
successors: Arc<RwLock<Vec<Did>>>,
}

pub trait SuccessorReader {
fn is_empty(&self) -> Result<bool>;
fn is_full(&self) -> Result<bool>;
fn get(&self, index: usize) -> Result<Did>;
fn len(&self) -> Result<usize>;
fn min(&self) -> Result<Did>;
fn max(&self) -> Result<Did>;
fn list(&self) -> Result<Vec<Did>>;
fn contains(&self, did: &Did) -> Result<bool>;
fn update_dry(&self, did: &[Did]) -> Result<Vec<Did>>;
}

pub trait SuccessorWriter {
fn update(&self, successor: Did) -> Result<Option<Did>>;
fn extend(&self, succ_list: &[Did]) -> Result<Vec<Did>>;
fn remove(&self, did: Did) -> Result<()>;
}

impl SuccessorSeq {
pub fn new(did: Did, max: u8) -> Self {
Self {
did,
max,
successors: vec![],
successors: Arc::new(RwLock::new(vec![])),
}
}

pub fn is_empty(&self) -> bool {
self.successors.is_empty()
pub fn successors(&self) -> Result<RwLockReadGuard<Vec<Did>>> {
self.successors
.read()
.map_err(|_| Error::FailedToReadSuccessors)
}

/// Calculate bias of the Did on the ring.
pub fn bias(&self, did: Did) -> BiasId {
BiasId::new(self.did, did)
}

/// Verify a given Did is fit successors protocol.
pub fn should_insert(&self, did: Did) -> Result<bool> {
if (self.contains(&did)?) || (did == self.did) {
return Ok(false);
}

if self.bias(did) >= self.bias(self.max()?) && self.is_full()? {
return Ok(false);
}
Ok(true)
}
}

impl SuccessorReader for SuccessorSeq {
fn contains(&self, did: &Did) -> Result<bool> {
let succs = self.successors()?;
Ok(succs.contains(did))
}

pub fn is_full(&self) -> bool {
self.successors.len() as u8 >= self.max
fn is_empty(&self) -> Result<bool> {
let succs = self.successors()?;
Ok(succs.is_empty())
}

pub fn min(&self) -> Did {
if self.is_empty() {
self.did
fn is_full(&self) -> Result<bool> {
let succs = self.successors()?;
Ok(succs.len() as u8 >= self.max)
}

fn get(&self, index: usize) -> Result<Did> {
let succs = self.successors()?;
Ok(succs[index])
}

fn len(&self) -> Result<usize> {
let succs = self.successors()?;
Ok(succs.len())
}

fn min(&self) -> Result<Did> {
if self.is_empty()? {
Ok(self.did)
} else {
self.successors[0]
Ok(self.get(0)?)
}
}

pub fn max(&self) -> Did {
if self.is_empty() {
self.did
fn max(&self) -> Result<Did> {
if self.is_empty()? {
Ok(self.did)
} else {
self.successors[self.successors.len() - 1]
self.get(self.len()? - 1)
}
}

pub fn update(&mut self, successor: Did) {
if self.successors.contains(&successor) || successor == self.did {
return;
fn list(&self) -> Result<Vec<Did>> {
let succs = self.successors()?;
Ok(succs.clone())
}

fn update_dry(&self, dids: &[Did]) -> Result<Vec<Did>> {
let mut ret = vec![];
for did in dids {
if self.should_insert(*did)? {
ret.push(*did)
}
}
self.successors.push(successor);
self.successors.sort(self.did);
self.successors.truncate(self.max.into());
Ok(ret)
}
}

pub fn extend(&mut self, succ_list: &Vec<Did>) {
for s in succ_list {
self.update(*s)
impl SuccessorWriter for SuccessorSeq {
fn update(&self, successor: Did) -> Result<Option<Did>> {
if !(self.should_insert(successor)?) {
return Ok(None);
}
let mut succs = self
.successors
.write()
.map_err(|_| Error::FailedToWriteSuccessors)?;

succs.push(successor);
succs.sort(self.did);
succs.truncate(self.max.into());
if succs.contains(&successor) {
Ok(Some(successor))
} else {
Ok(None)
}
}

pub fn list(&self) -> Vec<Did> {
self.successors.clone()
fn extend(&self, succ_list: &[Did]) -> Result<Vec<Did>> {
let mut ret = vec![];
for s in succ_list {
if let Some(r) = self.update(*s)? {
ret.push(r);
}
}
Ok(ret)
}

pub fn remove(&mut self, did: Did) {
self.successors.retain(|&v| v != did);
fn remove(&self, did: Did) -> Result<()> {
let mut succs = self
.successors
.write()
.map_err(|_| Error::FailedToWriteSuccessors)?;
succs.retain(|&v| v != did);
Ok(())
}
}

Expand All @@ -82,38 +181,39 @@ mod tests {
fn test_successor_update() {
let dids = gen_ordered_dids(6);

let mut succ = SuccessorSeq::new(dids[0], 3);
assert!(succ.is_empty());
let succ = SuccessorSeq::new(dids[0], 3);
assert!(succ.is_empty().unwrap());

succ.update(dids[2]);
assert_eq!(succ.list(), dids[2..3]);
succ.update(dids[2]).unwrap();
assert_eq!(succ.list().unwrap(), dids[2..3]);

succ.update(dids[3]);
assert_eq!(succ.list(), dids[2..4]);
succ.update(dids[3]).unwrap();
assert_eq!(succ.list().unwrap(), dids[2..4]);

succ.update(dids[4]);
assert_eq!(succ.list(), dids[2..5]);
succ.update(dids[4]).unwrap();
assert_eq!(succ.list().unwrap(), dids[2..5]);

succ.update(dids[5]);
assert_eq!(succ.list(), dids[2..5]);
succ.update(dids[5]).unwrap();
assert_eq!(succ.list().unwrap(), dids[2..5]);

succ.update(dids[1]);
assert_eq!(succ.list(), dids[1..4]);
succ.update(dids[1]).unwrap();
assert_eq!(succ.list().unwrap(), dids[1..4]);
}

#[test]
fn test_successor_remove() {
fn test_successor_remove() -> Result<()> {
let dids = gen_ordered_dids(4);

let mut succ = SuccessorSeq::new(dids[0], 3);
assert!(succ.is_empty());
let succ = SuccessorSeq::new(dids[0], 3);
assert!(succ.is_empty()?);

succ.update(dids[1]);
succ.update(dids[2]);
succ.update(dids[3]);
assert_eq!(succ.list(), dids[1..4]);
succ.update(dids[1])?.unwrap();
succ.update(dids[2])?.unwrap();
succ.update(dids[3])?.unwrap();
assert_eq!(succ.list()?, dids[1..4]);

succ.remove(dids[2]);
assert_eq!(succ.list(), vec![dids[1], dids[3]]);
succ.remove(dids[2])?;
assert_eq!(succ.list()?, vec![dids[1], dids[3]]);
Ok(())
}
}
Loading