Skip to content

Commit

Permalink
Separated transport crate
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Sep 1, 2023
1 parent 85163b8 commit eccf291
Show file tree
Hide file tree
Showing 52 changed files with 1,487 additions and 1,046 deletions.
558 changes: 363 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[workspace]
members = ["core", "node", "rpc", "derive"]
members = ["core", "transport", "node", "rpc", "derive"]

resolver = "2"

[workspace.dependencies]
rings-core = { version = "0.2.7", path = "core", default-features = false }
rings-derive = { version = "0.2.7", path = "derive", default-features = false }
rings-rpc = { version = "0.2.7", path = "rpc", default-features = false }
wasm-bindgen = { version = "0.2.84" }
rings-core = { version = "0.3.0", path = "core", default-features = false }
rings-derive = { version = "0.3.0", path = "derive", default-features = false }
rings-rpc = { version = "0.3.0", path = "rpc", default-features = false }
rings-transport = { version = "0.3.0", path = "transport" }
wasm-bindgen = { version = "0.2.87" }
wasm-bindgen-macro-support = { version = "0.2.84" }
7 changes: 5 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rings-core"
version = "0.2.7"
version = "0.3.0"
edition = "2021"
authors = ["RND <[email protected]>"]
description = "Chord DHT implementation with ICE"
Expand All @@ -25,8 +25,9 @@ std = [
"uuid/v4",
"uuid/serde",
"rings-derive/default",
"rings-transport/native-webrtc",
]
dummy = ["std", "lazy_static", "tokio"]
dummy = ["std", "lazy_static", "tokio", "rings-transport/dummy"]
wasm = [
"web-sys",
"wasm-bindgen",
Expand All @@ -40,6 +41,7 @@ wasm = [
"uuid/v4",
"uuid/serde",
"rings-derive/wasm",
"rings-transport/web-sys-webrtc",
]
browser_chrome_test = ["wasm"]

Expand Down Expand Up @@ -69,6 +71,7 @@ num-bigint = "0.4.3"
rand = { version = "0.8.5", features = ["getrandom"] }
rand_core = { version = "0.6.3", features = ["getrandom"] }
rand_hc = "0.3.1"
rings-transport = { workspace = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = { version = "1.0.70" }
sha1 = "0.10.1"
Expand Down
45 changes: 45 additions & 0 deletions core/src/dht/did.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,49 @@ mod tests {
assert_eq!(affine_dids[2], did.rotate(180));
assert_eq!(affine_dids[3], did.rotate(270));
}

#[test]
fn test_dump_and_load() {
// The length must be 40.
assert!(Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab").is_err());
assert!(Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab00").is_err());

// Allow omit 0x prefix
assert_eq!(
Did::from_str("11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap(),
Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap(),
);

// from_str then to_string
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
// TODO: Should be "0x11e807fcc88dd319270493fb2e822e388fe36ab0"
assert_eq!(did.to_string(), "11e807fcc88dd319270493fb2e822e388fe36ab0");

// Serialize
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
assert_eq!(
serde_json::to_string(&did).unwrap(),
"\"0x11e807fcc88dd319270493fb2e822e388fe36ab0\""
);

// Deserialize
let did =
serde_json::from_str::<Did>("\"0x11e807fcc88dd319270493fb2e822e388fe36ab0\"").unwrap();
assert_eq!(
did,
Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap()
);

// Debug and Display
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
// TODO: Should be "0x11e807fcc88dd319270493fb2e822e388fe36ab0"
assert_eq!(
format!("{}", did),
"11e807fcc88dd319270493fb2e822e388fe36ab0"
);
assert_eq!(
format!("{:?}", did),
"Did(0x11e807fcc88dd319270493fb2e822e388fe36ab0)"
);
}
}
95 changes: 50 additions & 45 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use rings_transport::core::transport::SharedConnection;

use crate::dht::successor::SuccessorReader;
use crate::dht::types::CorrectChord;
Expand All @@ -20,8 +21,6 @@ use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;

/// A combination contains chord and swarm, use to run stabilize.
/// - swarm: transports communicate with each others.
Expand All @@ -42,12 +41,12 @@ pub trait TStabilize {
}

impl Stabilization {
/// Clean unavailable transports from swarm.
pub async fn clean_unavailable_transports(&self) -> Result<()> {
let transports = self.swarm.get_transports();
/// Clean unavailable connections in transport.
pub async fn clean_unavailable_connections(&self) -> Result<()> {
let conns = self.swarm.get_connections();

for (did, t) in transports.into_iter() {
if t.is_disconnected().await {
for (did, conn) in conns.into_iter() {
if conn.is_disconnected().await {
tracing::info!("STABILIZATION clean_unavailable_transports: {:?}", did);
self.swarm.disconnect(did).await?;
}
Expand Down Expand Up @@ -169,11 +168,14 @@ impl Stabilization {
tracing::error!("[stabilize] Failed on fix_finger {:?}", e);
}
tracing::debug!("STABILIZATION fix_fingers end");
tracing::debug!("STABILIZATION clean_unavailable_transports start");
if let Err(e) = self.clean_unavailable_transports().await {
tracing::error!("[stabilize] Failed on clean unavailable transports {:?}", e);
tracing::debug!("STABILIZATION clean_unavailable_connections start");
if let Err(e) = self.clean_unavailable_connections().await {
tracing::error!(
"[stabilize] Failed on clean unavailable connections {:?}",
e
);
}
tracing::debug!("STABILIZATION clean_unavailable_transports end");
tracing::debug!("STABILIZATION clean_unavailable_connections end");
#[cfg(feature = "experimental")]
{
tracing::debug!("STABILIZATION correct_stabilize start");
Expand Down Expand Up @@ -253,7 +255,6 @@ pub mod tests {

use super::*;
use crate::ecc::SecretKey;
use crate::prelude::RTCIceConnectionState;
use crate::tests::default::prepare_node;
use crate::tests::manually_establish_connection;

Expand All @@ -269,57 +270,61 @@ pub mod tests {
// Shouldn't listen to message handler here,
// otherwise it will automatically remove disconnected transport.

manually_establish_connection(&node1, &node2).await.unwrap();
manually_establish_connection(&node1, &node3).await.unwrap();
manually_establish_connection(&node1, &node2).await;
manually_establish_connection(&node1, &node3).await;

tokio::time::sleep(Duration::from_secs(5)).await;

assert_eq!(
node1
.get_transport(node2.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Connected
format!(
"{:?}",
node1
.get_connection(node2.did())
.unwrap()
.ice_connection_state()
),
"connected"
);
assert_eq!(
node1
.get_transport(node3.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Connected
format!(
"{:?}",
node1
.get_connection(node3.did())
.unwrap()
.ice_connection_state()
),
"connected"
);

node2.disconnect(node1.did()).await.unwrap();
node3.disconnect(node1.did()).await.unwrap();

tokio::time::sleep(Duration::from_secs(10)).await;
assert_eq!(
node1
.get_transport(node2.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Disconnected
format!(
"{:?}",
node1
.get_connection(node2.did())
.unwrap()
.ice_connection_state()
),
"disconnected"
);
assert_eq!(
node1
.get_transport(node3.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Disconnected
format!(
"{:?}",
node1
.get_connection(node3.did())
.unwrap()
.ice_connection_state()
),
"disconnected"
);

let stb = Stabilization::new(node1.clone(), 3);
stb.clean_unavailable_transports().await.unwrap();
stb.clean_unavailable_connections().await.unwrap();

assert!(node1.get_transport(node2.did()).is_none());
assert!(node1.get_transport(node3.did()).is_none());
assert!(node1.get_connection(node2.did()).is_none());
assert!(node1.get_connection(node3.did()).is_none());
}
}
7 changes: 5 additions & 2 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ pub enum Error {
#[error("RTC unsupported sdp type")]
RTCSdpTypeNotMatch,

#[error("Transport not Found")]
TransportNotFound,
#[error("Connection not Found")]
ConnectionNotFound,

#[error("Invalid Transport Id")]
InvalidTransportUuid,
Expand Down Expand Up @@ -353,6 +353,9 @@ pub enum Error {

#[error("Session is expired")]
SessionExpired,

#[error("Transport error: {0}")]
Transport(#[from] rings_transport::error::Error),
}

#[cfg(feature = "wasm")]
Expand Down
28 changes: 10 additions & 18 deletions core/src/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use rings_transport::core::transport::SharedConnection;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -9,23 +10,19 @@ use crate::storage::MemStorage;
use crate::storage::PersistenceStorage;
use crate::storage::PersistenceStorageReadAndWrite;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;
use crate::utils::from_rtc_ice_connection_state;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmInspect {
pub transports: Vec<TransportInspect>,
pub connections: Vec<ConnectionInspect>,
pub dht: DHTInspect,
pub persistence_storage: StorageInspect,
pub cache_storage: StorageInspect,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportInspect {
pub struct ConnectionInspect {
pub did: String,
pub transport_id: String,
pub state: Option<String>,
pub state: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -45,19 +42,14 @@ pub struct StorageInspect {
impl SwarmInspect {
pub async fn inspect(swarm: &Swarm) -> Self {
let dht = DHTInspect::inspect(&swarm.dht());
let transports = {
let transports = swarm.get_transports();
let connections = {
let connections = swarm.get_connections();

let states_async = transports.iter().map(|(_, t)| t.ice_connection_state());
let states = futures::future::join_all(states_async).await;

transports
connections
.iter()
.zip(states.iter())
.map(|((did, transport), state)| TransportInspect {
.map(|(did, c)| ConnectionInspect {
did: did.to_string(),
transport_id: transport.id.to_string(),
state: state.map(from_rtc_ice_connection_state),
state: format!("{:?}", c.ice_connection_state()),
})
.collect()
};
Expand All @@ -66,7 +58,7 @@ impl SwarmInspect {
let cache_storage = StorageInspect::inspect_mem_storage(&swarm.dht().cache);

Self {
transports,
connections,
dht,
persistence_storage,
cache_storage,
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pub mod storage;
pub mod swarm;
#[cfg(test)]
mod tests;
pub mod transports;
pub mod types;
pub mod utils;
pub use async_trait::async_trait;
Expand Down
Loading

0 comments on commit eccf291

Please sign in to comment.