Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: separated transport crate #466

Merged
merged 10 commits into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
589 changes: 384 additions & 205 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[workspace]
members = ["core", "node", "rpc", "derive"]
members = ["core", "transport", "node", "rpc", "derive"]

resolver = "2"

[workspace.dependencies]
rings-core = { version = "0.2.7", path = "core", default-features = false }
rings-derive = { version = "0.2.7", path = "derive", default-features = false }
rings-rpc = { version = "0.2.7", path = "rpc", default-features = false }
wasm-bindgen = { version = "0.2.84" }
rings-core = { version = "0.3.0", path = "core", default-features = false }
rings-derive = { version = "0.3.0", path = "derive", default-features = false }
rings-rpc = { version = "0.3.0", path = "rpc", default-features = false }
rings-transport = { version = "0.3.0", path = "transport" }
wasm-bindgen = { version = "0.2.87" }
wasm-bindgen-macro-support = { version = "0.2.84" }
7 changes: 5 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rings-core"
version = "0.2.7"
version = "0.3.0"
edition = "2021"
authors = ["RND <[email protected]>"]
description = "Chord DHT implementation with ICE"
Expand All @@ -25,8 +25,9 @@ std = [
"uuid/v4",
"uuid/serde",
"rings-derive/default",
"rings-transport/native-webrtc",
]
dummy = ["std", "lazy_static", "tokio"]
dummy = ["std", "lazy_static", "tokio", "rings-transport/dummy"]
wasm = [
"web-sys",
"wasm-bindgen",
Expand All @@ -40,6 +41,7 @@ wasm = [
"uuid/v4",
"uuid/serde",
"rings-derive/wasm",
"rings-transport/web-sys-webrtc",
]
browser_chrome_test = ["wasm"]

Expand Down Expand Up @@ -69,6 +71,7 @@ num-bigint = "0.4.3"
rand = { version = "0.8.5", features = ["getrandom"] }
rand_core = { version = "0.6.3", features = ["getrandom"] }
rand_hc = "0.3.1"
rings-transport = { workspace = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = { version = "1.0.70" }
sha1 = "0.10.1"
Expand Down
45 changes: 45 additions & 0 deletions core/src/dht/did.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,49 @@ mod tests {
assert_eq!(affine_dids[2], did.rotate(180));
assert_eq!(affine_dids[3], did.rotate(270));
}

#[test]
fn test_dump_and_load() {
// The length must be 40.
assert!(Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab").is_err());
assert!(Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab00").is_err());

// Allow omit 0x prefix
assert_eq!(
Did::from_str("11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap(),
Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap(),
);

// from_str then to_string
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
// TODO: Should be "0x11e807fcc88dd319270493fb2e822e388fe36ab0"
assert_eq!(did.to_string(), "11e807fcc88dd319270493fb2e822e388fe36ab0");

// Serialize
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
assert_eq!(
serde_json::to_string(&did).unwrap(),
"\"0x11e807fcc88dd319270493fb2e822e388fe36ab0\""
);

// Deserialize
let did =
serde_json::from_str::<Did>("\"0x11e807fcc88dd319270493fb2e822e388fe36ab0\"").unwrap();
assert_eq!(
did,
Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap()
);

// Debug and Display
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
// TODO: Should be "0x11e807fcc88dd319270493fb2e822e388fe36ab0"
assert_eq!(
format!("{}", did),
"11e807fcc88dd319270493fb2e822e388fe36ab0"
);
assert_eq!(
format!("{:?}", did),
"Did(0x11e807fcc88dd319270493fb2e822e388fe36ab0)"
);
}
}
71 changes: 33 additions & 38 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::sync::Arc;

use async_trait::async_trait;
use rings_transport::core::transport::SharedConnection;

use crate::dht::successor::SuccessorReader;
use crate::dht::types::CorrectChord;
Expand All @@ -20,8 +21,6 @@ use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;

/// A combination contains chord and swarm, use to run stabilize.
/// - swarm: transports communicate with each others.
Expand All @@ -42,12 +41,12 @@ pub trait TStabilize {
}

impl Stabilization {
/// Clean unavailable transports from swarm.
pub async fn clean_unavailable_transports(&self) -> Result<()> {
let transports = self.swarm.get_transports();
/// Clean unavailable connections in transport.
pub async fn clean_unavailable_connections(&self) -> Result<()> {
let conns = self.swarm.get_connections();

for (did, t) in transports.into_iter() {
if t.is_disconnected().await {
for (did, conn) in conns.into_iter() {
if conn.is_disconnected().await {
tracing::info!("STABILIZATION clean_unavailable_transports: {:?}", did);
self.swarm.disconnect(did).await?;
}
Expand Down Expand Up @@ -169,11 +168,14 @@ impl Stabilization {
tracing::error!("[stabilize] Failed on fix_finger {:?}", e);
}
tracing::debug!("STABILIZATION fix_fingers end");
tracing::debug!("STABILIZATION clean_unavailable_transports start");
if let Err(e) = self.clean_unavailable_transports().await {
tracing::error!("[stabilize] Failed on clean unavailable transports {:?}", e);
tracing::debug!("STABILIZATION clean_unavailable_connections start");
if let Err(e) = self.clean_unavailable_connections().await {
tracing::error!(
"[stabilize] Failed on clean unavailable connections {:?}",
e
);
}
tracing::debug!("STABILIZATION clean_unavailable_transports end");
tracing::debug!("STABILIZATION clean_unavailable_connections end");
#[cfg(feature = "experimental")]
{
tracing::debug!("STABILIZATION correct_stabilize start");
Expand Down Expand Up @@ -251,14 +253,15 @@ mod stabilizer {
pub mod tests {
use std::time::Duration;

use rings_transport::core::transport::WebrtcConnectionState;

use super::*;
use crate::ecc::SecretKey;
use crate::prelude::RTCIceConnectionState;
use crate::tests::default::prepare_node;
use crate::tests::manually_establish_connection;

#[tokio::test]
async fn test_clean_unavailable_transports() {
async fn test_clean_unavailable_connections() {
let key1 = SecretKey::random();
let key2 = SecretKey::random();
let key3 = SecretKey::random();
Expand All @@ -269,28 +272,24 @@ pub mod tests {
// Shouldn't listen to message handler here,
// otherwise it will automatically remove disconnected transport.

manually_establish_connection(&node1, &node2).await.unwrap();
manually_establish_connection(&node1, &node3).await.unwrap();
manually_establish_connection(&node1, &node2).await;
manually_establish_connection(&node1, &node3).await;

tokio::time::sleep(Duration::from_secs(5)).await;

assert_eq!(
node1
.get_transport(node2.did())
.get_connection(node2.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Connected
.webrtc_connection_state(),
WebrtcConnectionState::Connected,
);
assert_eq!(
node1
.get_transport(node3.did())
.get_connection(node3.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Connected
.webrtc_connection_state(),
WebrtcConnectionState::Connected,
);

node2.disconnect(node1.did()).await.unwrap();
Expand All @@ -299,27 +298,23 @@ pub mod tests {
tokio::time::sleep(Duration::from_secs(10)).await;
assert_eq!(
node1
.get_transport(node2.did())
.get_connection(node2.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Disconnected
.webrtc_connection_state(),
WebrtcConnectionState::Disconnected,
);
assert_eq!(
node1
.get_transport(node3.did())
.get_connection(node3.did())
.unwrap()
.ice_connection_state()
.await
.unwrap(),
RTCIceConnectionState::Disconnected
.webrtc_connection_state(),
WebrtcConnectionState::Disconnected,
);

let stb = Stabilization::new(node1.clone(), 3);
stb.clean_unavailable_transports().await.unwrap();
stb.clean_unavailable_connections().await.unwrap();

assert!(node1.get_transport(node2.did()).is_none());
assert!(node1.get_transport(node3.did()).is_none());
assert!(node1.get_connection(node2.did()).is_none());
assert!(node1.get_connection(node3.did()).is_none());
}
}
7 changes: 5 additions & 2 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ pub enum Error {
#[error("RTC unsupported sdp type")]
RTCSdpTypeNotMatch,

#[error("Transport not Found")]
TransportNotFound,
#[error("Connection not Found")]
ConnectionNotFound,

#[error("Invalid Transport Id")]
InvalidTransportUuid,
Expand Down Expand Up @@ -353,6 +353,9 @@ pub enum Error {

#[error("Session is expired")]
SessionExpired,

#[error("Transport error: {0}")]
Transport(#[from] rings_transport::error::Error),
}

#[cfg(feature = "wasm")]
Expand Down
28 changes: 10 additions & 18 deletions core/src/inspect.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use rings_transport::core::transport::SharedConnection;
use serde::Deserialize;
use serde::Serialize;

Expand All @@ -9,23 +10,19 @@ use crate::storage::MemStorage;
use crate::storage::PersistenceStorage;
use crate::storage::PersistenceStorageReadAndWrite;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;
use crate::utils::from_rtc_ice_connection_state;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmInspect {
pub transports: Vec<TransportInspect>,
pub connections: Vec<ConnectionInspect>,
pub dht: DHTInspect,
pub persistence_storage: StorageInspect,
pub cache_storage: StorageInspect,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TransportInspect {
pub struct ConnectionInspect {
pub did: String,
pub transport_id: String,
pub state: Option<String>,
pub state: String,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
Expand All @@ -45,19 +42,14 @@ pub struct StorageInspect {
impl SwarmInspect {
pub async fn inspect(swarm: &Swarm) -> Self {
let dht = DHTInspect::inspect(&swarm.dht());
let transports = {
let transports = swarm.get_transports();
let connections = {
let connections = swarm.get_connections();

let states_async = transports.iter().map(|(_, t)| t.ice_connection_state());
let states = futures::future::join_all(states_async).await;

transports
connections
.iter()
.zip(states.iter())
.map(|((did, transport), state)| TransportInspect {
.map(|(did, c)| ConnectionInspect {
did: did.to_string(),
transport_id: transport.id.to_string(),
state: state.map(from_rtc_ice_connection_state),
state: format!("{:?}", c.ice_connection_state()),
})
.collect()
};
Expand All @@ -66,7 +58,7 @@ impl SwarmInspect {
let cache_storage = StorageInspect::inspect_mem_storage(&swarm.dht().cache);

Self {
transports,
connections,
dht,
persistence_storage,
cache_storage,
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pub mod storage;
pub mod swarm;
#[cfg(test)]
mod tests;
pub mod transports;
pub mod types;
pub mod utils;
pub use async_trait::async_trait;
Expand Down
Loading