Skip to content

Commit

Permalink
Separated transport crate
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Aug 30, 2023
1 parent 85163b8 commit fbece07
Show file tree
Hide file tree
Showing 37 changed files with 1,191 additions and 785 deletions.
558 changes: 363 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 6 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
[workspace]
members = ["core", "node", "rpc", "derive"]
members = ["core", "transport", "node", "rpc", "derive"]

resolver = "2"

[workspace.dependencies]
rings-core = { version = "0.2.7", path = "core", default-features = false }
rings-derive = { version = "0.2.7", path = "derive", default-features = false }
rings-rpc = { version = "0.2.7", path = "rpc", default-features = false }
wasm-bindgen = { version = "0.2.84" }
rings-core = { version = "0.3.0", path = "core", default-features = false }
rings-derive = { version = "0.3.0", path = "derive", default-features = false }
rings-rpc = { version = "0.3.0", path = "rpc", default-features = false }
rings-transport = { version = "0.3.0", path = "transport" }
wasm-bindgen = { version = "0.2.87" }
wasm-bindgen-macro-support = { version = "0.2.84" }
7 changes: 5 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rings-core"
version = "0.2.7"
version = "0.3.0"
edition = "2021"
authors = ["RND <[email protected]>"]
description = "Chord DHT implementation with ICE"
Expand All @@ -25,8 +25,9 @@ std = [
"uuid/v4",
"uuid/serde",
"rings-derive/default",
"rings-transport/native-webrtc",
]
dummy = ["std", "lazy_static", "tokio"]
dummy = ["std", "lazy_static", "tokio", "rings-transport/dummy"]
wasm = [
"web-sys",
"wasm-bindgen",
Expand All @@ -40,6 +41,7 @@ wasm = [
"uuid/v4",
"uuid/serde",
"rings-derive/wasm",
"rings-transport/web-sys-webrtc",
]
browser_chrome_test = ["wasm"]

Expand Down Expand Up @@ -69,6 +71,7 @@ num-bigint = "0.4.3"
rand = { version = "0.8.5", features = ["getrandom"] }
rand_core = { version = "0.6.3", features = ["getrandom"] }
rand_hc = "0.3.1"
rings-transport = { workspace = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = { version = "1.0.70" }
sha1 = "0.10.1"
Expand Down
42 changes: 42 additions & 0 deletions core/src/dht/did.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,46 @@ mod tests {
assert_eq!(affine_dids[2], did.rotate(180));
assert_eq!(affine_dids[3], did.rotate(270));
}

#[test]
fn test_dump_and_load() {
// Must starts with 0x
assert_eq!(
Did::from_str("11E807fcc88dD319270493fB2e822e388Fe36ab0"),
Err(Error::BadCHexInCache)
);

// from_str then to_string
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
assert_eq!(
did.to_string(),
"0x11e807fcc88dd319270493fb2e822e388fe36ab0"
);

// Serialize
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
assert_eq!(
serde_json::to_string(&did).unwrap(),
"\"0x11e807fcc88dd319270493fb2e822e388fe36ab0\""
);

// Deserialize
let did =
serde_json::from_str::<Did>("\"0x11e807fcc88dd319270493fb2e822e388fe36ab0\"").unwrap();
assert_eq!(
did,
Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap()
);

// Debug and Display
let did = Did::from_str("0x11E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
assert_eq!(
format!("{}", did),
"Did(0x11e807fcc88dd319270493fb2e822e388fe36ab0)"
);
assert_eq!(
format!("{:?}", did),
"Did(0x11e807fcc88dd319270493fb2e822e388fe36ab0)"
);
}
}
2 changes: 0 additions & 2 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;

/// A combination contains chord and swarm, use to run stabilize.
/// - swarm: transports communicate with each others.
Expand Down
3 changes: 3 additions & 0 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ pub enum Error {

#[error("Session is expired")]
SessionExpired,

#[error("Transport error: {0}")]
Transport(#[from] rings_transport::error::Error),
}

#[cfg(feature = "wasm")]
Expand Down
4 changes: 1 addition & 3 deletions core/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@ use crate::storage::MemStorage;
use crate::storage::PersistenceStorage;
use crate::storage::PersistenceStorageReadAndWrite;
use crate::swarm::Swarm;
use crate::transports::manager::TransportManager;
use crate::types::ice_transport::IceTransportInterface;
use crate::utils::from_rtc_ice_connection_state;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmInspect {
Expand Down Expand Up @@ -57,7 +55,7 @@ impl SwarmInspect {
.map(|((did, transport), state)| TransportInspect {
did: did.to_string(),
transport_id: transport.id.to_string(),
state: state.map(from_rtc_ice_connection_state),
state: state.map(|x| format!("{x:?}")),
})
.collect()
};
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ pub mod storage;
pub mod swarm;
#[cfg(test)]
mod tests;
pub mod transports;
pub mod types;
pub mod utils;
pub use async_trait::async_trait;
Expand Down
2 changes: 1 addition & 1 deletion core/src/message/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub enum MessageHandlerEvent {

/// Instructs the swarm to accept an answer inside payload by given
/// sender's Did and Message.
AcceptAnswer(NextHop, ConnectNodeReport),
AcceptAnswer(Did, ConnectNodeReport),

/// Tell swarm to forward the payload to destination by given
/// Payload and optional next hop.
Expand Down
9 changes: 2 additions & 7 deletions core/src/message/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use crate::dht::vnode::VirtualNode;
use crate::dht::Did;
use crate::dht::TopoInfo;
use crate::error::Result;
use crate::types::ice_transport::HandshakeInfo;

/// The `Then` trait is used to associate a type with a "then" scenario.
pub trait Then {
Expand All @@ -22,19 +21,15 @@ pub trait Then {
/// MessageType use to ask for connection, send to remote with transport_uuid and handshake_info.
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize, Clone)]
pub struct ConnectNodeSend {
/// uuid of transport
pub transport_uuid: String,
/// sdp offer of webrtc
pub offer: HandshakeInfo,
pub sdp: String,
}

/// MessageType report to origin with own transport_uuid and handshake_info.
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize, Clone)]
pub struct ConnectNodeReport {
/// uuid of transport
pub transport_uuid: String,
/// sdp answer of webrtc
pub answer: HandshakeInfo,
pub sdp: String,
}

/// MessageType use to find successor in a chord ring.
Expand Down
23 changes: 0 additions & 23 deletions core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,11 @@ pub use async_trait;
pub use base58;
pub use dashmap;
pub use futures;
#[cfg(feature = "wasm")]
pub use js_sys;
pub use libsecp256k1;
#[cfg(feature = "wasm")]
pub use rexie;
pub use url;
pub use uuid;
#[cfg(feature = "wasm")]
pub use wasm_bindgen;
#[cfg(feature = "wasm")]
pub use wasm_bindgen_futures;
pub use web3;
pub use web3::types::Address;
#[cfg(feature = "wasm")]
pub use web_sys;
#[cfg(feature = "wasm")]
pub use web_sys::RtcIceConnectionState as RTCIceConnectionState;
#[cfg(feature = "wasm")]
pub use web_sys::RtcSdpType as RTCSdpType;
#[cfg(not(feature = "wasm"))]
pub use webrtc;
#[cfg(not(feature = "wasm"))]
pub use webrtc::ice_transport::ice_connection_state::RTCIceConnectionState;
#[cfg(not(feature = "wasm"))]
pub use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
#[cfg(not(feature = "wasm"))]
pub use webrtc::peer_connection::sdp::sdp_type::RTCSdpType;

pub use crate::dht::vnode;
pub use crate::message;
Expand All @@ -40,4 +18,3 @@ pub use crate::message::MessageRelay;
pub use crate::message::SubringInterface;
pub use crate::storage::PersistenceStorage;
pub use crate::storage::PersistenceStorageReadAndWrite;
pub use crate::transports::Transport;
34 changes: 14 additions & 20 deletions core/src/swarm/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,26 @@
//! This module provider [SwarmBuilder] and it's interface for
//! [Swarm]

use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;

use rings_transport::core::callback::Callback;
use rings_transport::Transport;

use crate::channels::Channel;
use crate::dht::PeerRing;
use crate::message::CallbackFn;
use crate::message::MessageHandler;
use crate::message::ValidatorFn;
use crate::session::SessionSk;
use crate::storage::MemStorage;
use crate::storage::PersistenceStorage;
use crate::swarm::callback::SwarmCallback;
use crate::swarm::MeasureImpl;
use crate::swarm::Swarm;
use crate::types::channel::Channel as ChannelTrait;
use crate::types::ice_transport::IceServer;

/// Creates a SwarmBuilder to configure a Swarm.
pub struct SwarmBuilder {
ice_servers: Vec<IceServer>,
ice_servers: String,
external_address: Option<String>,
dht_succ_max: u8,
dht_storage: PersistenceStorage,
Expand All @@ -35,17 +35,8 @@ pub struct SwarmBuilder {
impl SwarmBuilder {
/// Creates new instance of [SwarmBuilder]
pub fn new(ice_servers: &str, dht_storage: PersistenceStorage, session_sk: SessionSk) -> Self {
let ice_servers = ice_servers
.split(';')
.collect::<Vec<&str>>()
.into_iter()
.map(|s| {
IceServer::from_str(s)
.unwrap_or_else(|_| panic!("Failed on parse ice server {:?}", s))
})
.collect::<Vec<IceServer>>();
SwarmBuilder {
ice_servers,
ice_servers: ice_servers.to_string(),
external_address: None,
dht_succ_max: 3,
dht_storage,
Expand Down Expand Up @@ -107,16 +98,19 @@ impl SwarmBuilder {
let message_handler =
MessageHandler::new(dht.clone(), self.message_callback, self.message_validator);

let transport_event_channel = Channel::new();

let transport = Transport::new(&self.ice_servers, self.external_address);
let callback = Arc::new(SwarmCallback::new(transport_event_channel.sender()).boxed());

Swarm {
pending_transports: Mutex::new(vec![]),
transports: MemStorage::new(),
transport_event_channel: Channel::new(),
ice_servers: self.ice_servers,
external_address: self.external_address,
transport_event_channel,
dht,
measure: self.measure,
session_sk: self.session_sk,
message_handler,
transport,
callback,
}
}
}
35 changes: 35 additions & 0 deletions core/src/swarm/callback.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use async_trait::async_trait;
use rings_transport::core::callback::Callback;

use crate::channels::Channel;
use crate::error::Error;
use crate::error::Result;
use crate::types::channel::Channel as ChannelTrait;
use crate::types::channel::TransportEvent;

type TransportEventSender = <Channel<TransportEvent> as ChannelTrait<TransportEvent>>::Sender;

pub struct SwarmCallback {
transport_event_sender: TransportEventSender,
}

impl SwarmCallback {
pub fn new(transport_event_sender: TransportEventSender) -> Self {
Self {
transport_event_sender,
}
}
}

#[async_trait]
impl Callback for SwarmCallback {
type Error = Error;

async fn on_message(&self, cid: &str, msg: &[u8]) -> Result<()> {
Channel::send(
&self.transport_event_sender,
TransportEvent::DataChannelMessage(msg.into()),
)
.await
}
}
Loading

0 comments on commit fbece07

Please sign in to comment.