Skip to content

Commit

Permalink
Hide the SwarmTransport in the Swarm of rings-core
Browse files Browse the repository at this point in the history
  • Loading branch information
Ma233 committed Mar 28, 2024
1 parent cb9e035 commit 604bd7d
Show file tree
Hide file tree
Showing 24 changed files with 130 additions and 236 deletions.
2 changes: 1 addition & 1 deletion crates/core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::message::MessagePayload;
use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::transport::SwarmTransport;
use crate::swarm::transport::SwarmTransport;

/// The stabilization runner.
#[derive(Clone)]
Expand Down
16 changes: 3 additions & 13 deletions crates/core/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::swarm::Swarm;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmInspect {
pub connections: Vec<ConnectionInspect>,
pub peers: Vec<ConnectionInspect>,
pub dht: DHTInspect,
pub persistence_storage: StorageInspect,
pub cache_storage: StorageInspect,
Expand Down Expand Up @@ -38,22 +38,12 @@ pub struct StorageInspect {
impl SwarmInspect {
pub async fn inspect(swarm: &Swarm) -> Self {
let dht = DHTInspect::inspect(&swarm.dht());
let connections = {
let connections = swarm.transport.get_connections();

connections
.iter()
.map(|(did, c)| ConnectionInspect {
did: did.to_string(),
state: format!("{:?}", c.webrtc_connection_state()),
})
.collect()
};
let peers = swarm.peers();
let persistence_storage = StorageInspect::inspect_kv_storage(&swarm.dht().storage).await;
let cache_storage = StorageInspect::inspect_kv_storage(&swarm.dht().cache).await;

Self {
connections,
peers,
dht,
persistence_storage,
cache_storage,
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,3 @@ pub mod chunk;
pub mod consts;
pub mod inspect;
pub mod measure;
pub mod transport;
8 changes: 8 additions & 0 deletions crates/core/src/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@ use async_trait::async_trait;

use crate::dht::Did;

/// Type of Measure, see [Measure].
#[cfg(not(feature = "wasm"))]
pub type MeasureImpl = Box<dyn BehaviourJudgement + Send + Sync>;

/// Type of Measure, see [crate::measure::Measure].
#[cfg(feature = "wasm")]
pub type MeasureImpl = Box<dyn BehaviourJudgement>;

/// The tag of counters in measure.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum MeasureCounter {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/message/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::message::NotifyPredecessorSend;
use crate::message::PayloadSender;
use crate::swarm::callback::InnerSwarmCallback;
use crate::swarm::callback::SharedSwarmCallback;
use crate::transport::SwarmTransport;
use crate::swarm::transport::SwarmTransport;

/// Operator and Handler for Connection
pub mod connection;
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/message/handlers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::message::MessageHandler;
use crate::message::MessagePayload;
use crate::message::PayloadSender;
use crate::prelude::vnode::VNodeOperation;
use crate::swarm::transport::SwarmTransport;
use crate::swarm::Swarm;
use crate::transport::SwarmTransport;

/// ChordStorageInterface should imply necessary method for DHT storage
#[cfg_attr(feature = "wasm", async_trait(?Send))]
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/swarm/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use std::sync::RwLock;

use crate::dht::PeerRing;
use crate::dht::VNodeStorage;
use crate::measure::MeasureImpl;
use crate::session::SessionSk;
use crate::swarm::callback::SharedSwarmCallback;
use crate::swarm::callback::SwarmCallback;
use crate::swarm::transport::SwarmTransport;
use crate::swarm::Swarm;
use crate::transport::MeasureImpl;
use crate::transport::SwarmTransport;

struct DefaultCallback;
impl SwarmCallback for DefaultCallback {}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/swarm/callback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::message::Message;
use crate::message::MessageHandler;
use crate::message::MessagePayload;
use crate::message::MessageVerificationExt;
use crate::transport::SwarmTransport;
use crate::swarm::transport::SwarmTransport;

type CallbackError = Box<dyn std::error::Error>;

Expand Down
51 changes: 26 additions & 25 deletions crates/core/src/swarm/mod.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
#![warn(missing_docs)]
//! Tranposrt management

//! This mod is the main entrance of swarm.

mod builder;
/// Callback interface for swarm
pub mod callback;
pub(crate) mod transport;

use std::sync::Arc;
use std::sync::RwLock;

use async_trait::async_trait;
pub use builder::SwarmBuilder;

use self::callback::InnerSwarmCallback;
Expand All @@ -17,20 +18,21 @@ use crate::dht::PeerRing;
use crate::dht::Stabilizer;
use crate::error::Error;
use crate::error::Result;
use crate::inspect::ConnectionInspect;
use crate::inspect::SwarmInspect;
use crate::message::Message;
use crate::message::MessagePayload;
use crate::message::MessageVerificationExt;
use crate::message::PayloadSender;
use crate::swarm::callback::SharedSwarmCallback;
use crate::transport::SwarmTransport;
use crate::swarm::transport::SwarmTransport;

/// The transport and dht management.
pub struct Swarm {
/// Reference of DHT.
pub(crate) dht: Arc<PeerRing>,
/// Swarm tansport.
pub transport: Arc<SwarmTransport>,
pub(crate) transport: Arc<SwarmTransport>,
callback: RwLock<SharedSwarmCallback>,
}

Expand Down Expand Up @@ -101,33 +103,28 @@ impl Swarm {
self.transport.send_message(msg, destination).await
}

/// List peers and their connection status.
pub fn peers(&self) -> Vec<ConnectionInspect> {
self.transport
.get_connections()
.iter()
.map(|(did, c)| ConnectionInspect {
did: did.to_string(),
state: format!("{:?}", c.webrtc_connection_state()),
})
.collect()
}

/// Check the status of swarm
pub async fn inspect(&self) -> SwarmInspect {
SwarmInspect::inspect(self).await
}
}

/// ConnectionHandshake defined how to connect two connections between two swarms.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait ConnectionHandshake {
impl Swarm {
/// Creaet new connection and its answer. This function will wrap the offer inside a payload
/// with verification.
async fn create_offer(&self, peer: Did) -> Result<MessagePayload>;

/// Answer the offer of remote connection. This function will verify the answer payload and
/// will wrap the answer inside a payload with verification.
async fn answer_offer(&self, offer_payload: MessagePayload) -> Result<MessagePayload>;

/// Accept the answer of remote connection. This function will verify the answer payload and
/// will return its did with the connection.
async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<()>;
}

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ConnectionHandshake for Swarm {
async fn create_offer(&self, peer: Did) -> Result<MessagePayload> {
pub async fn create_offer(&self, peer: Did) -> Result<MessagePayload> {
let offer_msg = self
.transport
.prepare_connection_offer(peer, self.inner_callback()?)
Expand All @@ -145,7 +142,9 @@ impl ConnectionHandshake for Swarm {
Ok(payload)
}

async fn answer_offer(&self, offer_payload: MessagePayload) -> Result<MessagePayload> {
/// Answer the offer of remote connection. This function will verify the answer payload and
/// will wrap the answer inside a payload with verification.
pub async fn answer_offer(&self, offer_payload: MessagePayload) -> Result<MessagePayload> {
if !offer_payload.verify() {
return Err(Error::VerifySignatureFailed);
}
Expand Down Expand Up @@ -174,7 +173,9 @@ impl ConnectionHandshake for Swarm {
Ok(answer_payload)
}

async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<()> {
/// Accept the answer of remote connection. This function will verify the answer payload and
/// will return its did with the connection.
pub async fn accept_answer(&self, answer_payload: MessagePayload) -> Result<()> {
if !answer_payload.verify() {
return Err(Error::VerifySignatureFailed);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use crate::dht::LiveDid;
use crate::dht::PeerRing;
use crate::error::Error;
use crate::error::Result;
use crate::measure::BehaviourJudgement;
use crate::measure::MeasureImpl;
use crate::message::ConnectNodeReport;
use crate::message::ConnectNodeSend;
use crate::message::Message;
Expand All @@ -38,14 +38,6 @@ use crate::message::PayloadSender;
use crate::session::SessionSk;
use crate::swarm::callback::InnerSwarmCallback;

/// Type of Measure, see [crate::measure::Measure].
#[cfg(not(feature = "wasm"))]
pub type MeasureImpl = Box<dyn BehaviourJudgement + Send + Sync>;

/// Type of Measure, see [crate::measure::Measure].
#[cfg(feature = "wasm")]
pub type MeasureImpl = Box<dyn BehaviourJudgement>;

pub struct SwarmTransport {
transport: Transport,
session_sk: SessionSk,
Expand Down
7 changes: 2 additions & 5 deletions crates/core/src/tests/default/test_stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,12 @@ async fn test_stabilization_final_dht() -> Result<()> {
for swarm in swarms.iter() {
println!(
"Connected peers: {:?}",
SwarmInspect::inspect(swarm).await.connections
SwarmInspect::inspect(swarm).await.peers
);
current_dhts.push(DHTInspect::inspect(&swarm.dht()));
}

for (i, (cur, exp)) in std::iter::zip(current_dhts, expected_dhts)
.enumerate()
.skip(2)
{
for (i, (cur, exp)) in std::iter::zip(current_dhts, expected_dhts).enumerate() {
println!("Check node{}", i);
pretty_assertions::assert_eq!(cur, exp);
}
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::swarm::ConnectionHandshake;
use crate::swarm::Swarm;

#[cfg(feature = "wasm")]
Expand Down
2 changes: 0 additions & 2 deletions crates/node/src/native/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ impl Client {
})
.await
.map_err(|e| anyhow::anyhow!("{}", e))?
.peer
.ok_or_else(|| anyhow::anyhow!("peer did not return"))?
.did;

ClientOutput::ok(format!("Remote did: {}", peer_did), peer_did)
Expand Down
35 changes: 18 additions & 17 deletions crates/node/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ use std::time::Duration;

use rings_core::dht::Did;
use rings_core::dht::VNodeStorage;
use rings_core::measure::MeasureImpl;
use rings_core::message::Encoded;
use rings_core::message::Encoder;
use rings_core::message::Message;
use rings_core::prelude::uuid;
use rings_core::storage::MemStorage;
use rings_core::swarm::Swarm;
use rings_core::swarm::SwarmBuilder;
use rings_core::transport::MeasureImpl;
use rings_rpc::protos::rings_node::*;
use serde::Deserialize;
use serde::Serialize;
Expand Down Expand Up @@ -374,8 +374,6 @@ impl Processor {
mod test {
use futures::lock::Mutex;
use rings_core::swarm::callback::SwarmCallback;
use rings_core::swarm::ConnectionHandshake;
use rings_transport::core::transport::WebrtcConnectionState;

use super::*;
use crate::prelude::*;
Expand All @@ -386,9 +384,9 @@ mod test {
let peer_did = SecretKey::random().address().into();
let processor = prepare_processor().await;
processor.swarm.create_offer(peer_did).await.unwrap();
let conn_dids = processor.swarm.transport.get_connection_ids();
let conn_dids = processor.swarm.peers();
assert_eq!(conn_dids.len(), 1);
assert_eq!(conn_dids.first().unwrap(), &peer_did);
assert_eq!(conn_dids.first().unwrap().did, peer_did.to_string());
}

struct SwarmCallbackInstance {
Expand Down Expand Up @@ -437,11 +435,12 @@ mod test {
let offer = p1.swarm.create_offer(p2.did()).await.unwrap();
assert_eq!(
p1.swarm
.transport
.get_connection(p2.did())
.peers()
.into_iter()
.find(|peer| peer.did == p2.did().to_string())
.unwrap()
.webrtc_connection_state(),
WebrtcConnectionState::New,
.state,
"New"
);

let answer = p2.swarm.answer_offer(offer).await.unwrap();
Expand All @@ -452,20 +451,22 @@ mod test {

assert_eq!(
p1.swarm
.transport
.get_connection(p2.did())
.peers()
.into_iter()
.find(|peer| peer.did == p2.did().to_string())
.unwrap()
.webrtc_connection_state(),
WebrtcConnectionState::Connected,
.state,
"Connected",
"p1 connection not connected"
);
assert_eq!(
p2.swarm
.transport
.get_connection(p1.did())
.peers()
.into_iter()
.find(|peer| peer.did == p1.did().to_string())
.unwrap()
.webrtc_connection_state(),
WebrtcConnectionState::Connected,
.state,
"Connected",
"p2 connection not connected"
);

Expand Down
Loading

0 comments on commit 604bd7d

Please sign in to comment.