Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] feature: add SwarmCallback to replace message CallbackFn and Validator #481

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions core/src/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
#![warn(missing_docs)]

use std::str::FromStr;
use std::sync::Arc;

use rings_transport::core::callback::BoxedTransportCallback;
use rings_transport::core::transport::BoxedTransport;
use rings_transport::core::transport::ConnectionInterface;
use rings_transport::error::Error as TransportError;

use crate::dht::Did;
use crate::dht::PeerRing;
use crate::error::Error;
use crate::error::Result;
use crate::types::Connection;
use crate::types::ConnectionOwner;

pub struct RingsBackend {
pub(crate) dht: Arc<PeerRing>,
transport: BoxedTransport<ConnectionOwner, TransportError>,
}

impl RingsBackend {
pub fn new(
dht: Arc<PeerRing>,
transport: BoxedTransport<ConnectionOwner, TransportError>,
) -> Self {
Self { dht, transport }
}

/// Create new connection.
pub async fn new_connection(
&self,
did: Did,
transport_callback: Arc<BoxedTransportCallback>,
) -> Result<Connection> {
let cid = did.to_string();
self.transport
.new_connection(&cid, transport_callback)
.await
.map_err(Error::Transport)?;
self.transport.connection(&cid).map_err(|e| e.into())
}

/// Disconnect a connection. There are three steps:
/// 1) remove from DHT;
/// 2) remove from Transport;
/// 3) close the connection;
pub async fn disconnect(&self, did: Did) -> Result<()> {
tracing::info!("[disconnect] removing from DHT {:?}", did);
self.dht.remove(did)?;
self.transport
.close_connection(&did.to_string())
.await
.map_err(|e| e.into())
}

/// Get connection by did and check if it is connected.
pub async fn get_and_check_connection(&self, did: Did) -> Option<Connection> {
let cid = did.to_string();

let Ok(c) = self.transport.connection(&cid) else {
return None;
};

if c.is_connected().await {
return Some(c);
}

tracing::debug!(
"[get_and_check_connection] connection {did} is not connected, will be dropped"
);

if let Err(e) = self.disconnect(did).await {
tracing::error!("Failed on close connection {did}: {e:?}");
};

None
}

/// Get connection by did.
pub fn connection(&self, did: Did) -> Option<Connection> {
self.transport.connection(&did.to_string()).ok()
}

/// Get all connections in transport.
pub fn connections(&self) -> Vec<(Did, Connection)> {
self.transport
.connections()
.into_iter()
.filter_map(|(k, v)| Did::from_str(&k).ok().map(|did| (did, v)))
.collect()
}

/// Get dids of all connections in transport.
pub fn connection_ids(&self) -> Vec<Did> {
self.transport
.connection_ids()
.into_iter()
.filter_map(|k| Did::from_str(&k).ok())
.collect()
}
}
36 changes: 14 additions & 22 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use rings_transport::core::transport::ConnectionInterface;
use crate::dht::successor::SuccessorReader;
use crate::dht::types::CorrectChord;
use crate::dht::Chord;
use crate::dht::PeerRing;
use crate::dht::PeerRingAction;
use crate::dht::PeerRingRemoteAction;
use crate::error::Result;
Expand All @@ -22,12 +21,9 @@ use crate::message::PayloadSender;
use crate::message::QueryForTopoInfoSend;
use crate::swarm::Swarm;

/// A combination contains chord and swarm, use to run stabilize.
/// - swarm: transports communicate with each others.
/// - chord: fix local fingers table.
/// Used to run stabilize.
#[derive(Clone)]
pub struct Stabilization {
chord: Arc<PeerRing>,
swarm: Arc<Swarm>,
timeout: usize,
}
Expand All @@ -43,7 +39,7 @@ pub trait TStabilize {
impl Stabilization {
/// Clean unavailable connections in transport.
pub async fn clean_unavailable_connections(&self) -> Result<()> {
let conns = self.swarm.get_connections();
let conns = self.swarm.backend.connections();

for (did, conn) in conns.into_iter() {
if conn.is_disconnected().await {
Expand All @@ -59,11 +55,7 @@ impl Stabilization {
impl Stabilization {
/// Create a new instance of Stabilization
pub fn new(swarm: Arc<Swarm>, timeout: usize) -> Self {
Self {
chord: swarm.dht(),
swarm,
timeout,
}
Self { swarm, timeout }
}

/// Get timeout of waiting delays.
Expand All @@ -76,14 +68,14 @@ impl Stabilization {
/// Notify predecessor, this is a DHT operation.
pub async fn notify_predecessor(&self) -> Result<()> {
let (successor_min, successor_list) = {
let successor = self.chord.successors();
let successor = self.swarm.backend.dht.successors();
(successor.min()?, successor.list()?)
};

let msg = Message::NotifyPredecessorSend(NotifyPredecessorSend {
did: self.chord.did,
did: self.swarm.did(),
});
if self.chord.did != successor_min {
if self.swarm.did() != successor_min {
for s in successor_list {
tracing::debug!("STABILIZATION notify_predecessor: {:?}", s);
let payload = MessagePayload::new_send(
Expand All @@ -102,7 +94,7 @@ impl Stabilization {

/// Fix fingers from finger table, this is a DHT operation.
async fn fix_fingers(&self) -> Result<()> {
match self.chord.fix_fingers() {
match self.swarm.backend.dht.fix_fingers() {
Ok(action) => match action {
PeerRingAction::None => Ok(()),
PeerRingAction::RemoteAction(
Expand Down Expand Up @@ -143,7 +135,7 @@ impl Stabilization {
if let PeerRingAction::RemoteAction(
next,
PeerRingRemoteAction::QueryForSuccessorListAndPred,
) = self.chord.pre_stabilize()?
) = self.swarm.backend.dht.pre_stabilize()?
{
let evs = vec![MessageHandlerEvent::SendDirectMessage(
Message::QueryForTopoInfoSend(QueryForTopoInfoSend::new_for_stab(next)),
Expand Down Expand Up @@ -279,14 +271,14 @@ pub mod tests {

assert_eq!(
node1
.get_connection(node2.did())
.backend.connection(node2.did())
.unwrap()
.webrtc_connection_state(),
WebrtcConnectionState::Connected,
);
assert_eq!(
node1
.get_connection(node3.did())
.backend.connection(node3.did())
.unwrap()
.webrtc_connection_state(),
WebrtcConnectionState::Connected,
Expand All @@ -298,14 +290,14 @@ pub mod tests {
tokio::time::sleep(Duration::from_secs(10)).await;
assert_eq!(
node1
.get_connection(node2.did())
.backend.connection(node2.did())
.unwrap()
.webrtc_connection_state(),
WebrtcConnectionState::Disconnected,
);
assert_eq!(
node1
.get_connection(node3.did())
.backend.connection(node3.did())
.unwrap()
.webrtc_connection_state(),
WebrtcConnectionState::Disconnected,
Expand All @@ -314,7 +306,7 @@ pub mod tests {
let stb = Stabilization::new(node1.clone(), 3);
stb.clean_unavailable_connections().await.unwrap();

assert!(node1.get_connection(node2.did()).is_none());
assert!(node1.get_connection(node3.did()).is_none());
assert!(node1.backend.connection(node2.did()).is_none());
assert!(node1.backend.connection(node3.did()).is_none());
}
}
2 changes: 1 addition & 1 deletion core/src/inspect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl SwarmInspect {
pub async fn inspect(swarm: &Swarm) -> Self {
let dht = DHTInspect::inspect(&swarm.dht());
let connections = {
let connections = swarm.get_connections();
let connections = swarm.backend.connections();

connections
.iter()
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ pub mod types;
pub mod utils;
pub use async_trait::async_trait;
pub use futures;
pub mod backend;
pub mod chunk;
pub mod consts;
pub mod inspect;
Expand Down
16 changes: 8 additions & 8 deletions core/src/message/handlers/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ pub mod tests {
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node3.did()
));
// dht3 won't set did3 as successor
assert!(!node3.dht.successors().list()?.contains(&node3.did()));
assert!(!node3.dht().successors().list()?.contains(&node3.did()));

// 2->3 FindSuccessorReport
let ev_3 = node3.listen_once().await.unwrap().0;
Expand All @@ -620,7 +620,7 @@ pub mod tests {
Message::FindSuccessorReport(FindSuccessorReport{did, handler: FindSuccessorReportHandler::Connect}) if did == node1.did()
));
// dht1 won't set did1 as successor
assert!(!node1.dht.successors().list()?.contains(&node1.did()));
assert!(!node1.dht().successors().list()?.contains(&node1.did()));

assert_no_more_msg(&node1, &node2, &node3).await;

Expand Down Expand Up @@ -721,10 +721,10 @@ pub mod tests {
node3: &Swarm,
) -> Result<()> {
// check node1 and node3 is not connected to each other
assert!(node1.get_connection(node3.did()).is_none());
assert!(node1.backend.connection(node3.did()).is_none());

// node1's successor should be node2 now
assert_eq!(node1.dht.successors().max()?, node2.did());
assert_eq!(node1.dht().successors().max()?, node2.did());

node1.connect(node3.did()).await.unwrap();

Expand Down Expand Up @@ -791,12 +791,12 @@ pub mod tests {
println!(
"Check transport of {:?}: {:?} for addresses {:?}",
swarm.did(),
swarm.get_connection_ids(),
swarm.backend.connection_ids(),
addresses
);
assert_eq!(swarm.get_connections().len(), addresses.len());
assert_eq!(swarm.backend.connections().len(), addresses.len());
for addr in addresses {
assert!(swarm.get_connection(addr).is_some());
assert!(swarm.backend.connection(addr).is_some());
}
}

Expand Down Expand Up @@ -947,7 +947,7 @@ pub mod tests {
for _ in 1..10 {
println!("wait 3 seconds for node2's transport 2to1 closing");
sleep(Duration::from_secs(3)).await;
if let Some(t) = node2.get_connection(node1.did()) {
if let Some(t) = node2.backend.connection(node1.did()) {
if t.is_disconnected().await {
println!("transport 2to1 is disconnected!!!!");
break;
Expand Down
Loading
Loading