Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit connection by measurement #459

Merged
merged 7 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ tracing = "0.1.37"
url = { version = "2", features = ["serde"] }

futures = { version = "0.3.21", default-features = false, optional = true }
rings-derive = { workspace = true, optional = true }
rings-derive = { workspace = true, optional = true, features = ["core_crate"] }
uuid = { version = "0.8.2", optional = true }
web3 = { version = "0.18.0", default-features = false, optional = true }

Expand Down
2 changes: 1 addition & 1 deletion core/src/dht/finger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl FingerTable {
return;
}
if did == self.did {
tracing::info!("set finger table with self did, ignore it");
tracing::trace!("set finger table with self did, ignore it");
return;
}
self.finger[index] = Some(did);
Expand Down
4 changes: 2 additions & 2 deletions core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Stabilization {
});
if self.chord.did != successor_min {
for s in successor_list {
tracing::info!("STABILIZATION notify_predecessor: {:?}", s);
tracing::debug!("STABILIZATION notify_predecessor: {:?}", s);
let payload = MessagePayload::new_send(
msg.clone(),
self.swarm.session_manager(),
Expand All @@ -110,7 +110,7 @@ impl Stabilization {
closest_predecessor,
PeerRingRemoteAction::FindSuccessorForFix(finger_did),
) => {
tracing::info!("STABILIZATION fix_fingers: {:?}", finger_did);
tracing::debug!("STABILIZATION fix_fingers: {:?}", finger_did);
let msg = Message::FindSuccessorSend(FindSuccessorSend {
did: finger_did,
then: FindSuccessorThen::Report(FindSuccessorReportHandler::FixFingerTable),
Expand Down
7 changes: 5 additions & 2 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,11 @@ pub enum Error {
#[error("Cannot seek did in swarm table, {0}")]
SwarmMissDidInTable(crate::dht::Did),

#[error("Cannot gather local candidate")]
FailedOnGatherLocalCandidate,
#[error("Cannot gather local candidate, {0}")]
FailedOnGatherLocalCandidate(String),

#[error("Node behaviour bad")]
NodeBehaviourBad(crate::dht::Did),

#[error("Cannot get transport from did: {0}")]
SwarmMissTransport(crate::dht::Did),
Expand Down
1 change: 0 additions & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
//! ```shell
//! cargo build -p rings-core --target=wasm32-unknown-unknown --features wasm --no-default-features
//! ```

pub mod channels;
pub mod dht;
pub mod ecc;
Expand Down
60 changes: 60 additions & 0 deletions core/src/measure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ pub enum MeasureCounter {
Received,
/// The number of failed to receive messages.
FailedToReceive,
/// The number of connected.
Connect,
/// The number of disconnect.
Disconnected,
}

/// `Measure` is used to assess the reliability of peers by counting their behaviour.
Expand All @@ -29,3 +33,59 @@ pub trait Measure {
/// `get_count` returns the counter of the given peer.
async fn get_count(&self, did: Did, counter: MeasureCounter) -> u64;
}

/// `BehaviourJudgement` trait defines a method `good` for assessing whether a node behaves well.
/// Any structure implementing this trait should provide a way to measure the "goodness" of a node.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait BehaviourJudgement: Measure {
/// This asynchronous method should return a boolean indicating whether the node identified by `did` is behaving well.
async fn good(&self, did: Did) -> bool;
}

/// `ConnectBehaviour` trait offers a default implementation for the `good` method, providing a judgement
/// based on a node's behavior in establishing connections.
/// The "goodness" of a node is measured by comparing the connection and disconnection counts against a given threshold.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait ConnectBehaviour<const THRESHOLD: i16>: Measure {
/// This asynchronous method returns a boolean indicating whether the node identified by `did` has a satisfactory connection behavior.
async fn good(&self, did: Did) -> bool {
let conn = self.get_count(did, MeasureCounter::Connect).await;
let disconn = self.get_count(did, MeasureCounter::Disconnected).await;
tracing::debug!(
"[ConnectBehaviour] in Threadhold: {:}, connect: {:}, disconn: {:}, delta: {:}",
THRESHOLD,
conn,
disconn,
conn - disconn
);
((conn - disconn) as i16) < THRESHOLD
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure adaptability across different orders of magnitude, it may be more effective to calculate using the admission base and ratio rather than simply taking the difference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, this algorithm is just a very basically impl for current situation, we needs ratio based, time weighted based and so on. And futher more, we allow users to compose those impls together.

}
}

/// `MessageSendBehaviour` trait provides a default implementation for the `good` method, judging a node's
/// behavior based on its message sending capabilities.
/// The "goodness" of a node is measured by comparing the sent and failed-to-send counts against a given threshold.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait MessageSendBehaviour<const THRESHOLD: i16>: Measure {
/// This asynchronous method returns a boolean indicating whether the node identified by `did` has a satisfactory message sending behavior.
async fn good(&self, did: Did) -> bool {
let failed = self.get_count(did, MeasureCounter::FailedToSend).await;
(failed as i16) < THRESHOLD
}
}

/// `MessageRecvBehaviour` trait provides a default implementation for the `good` method, assessing a node's
/// behavior based on its message receiving capabilities.
/// The "goodness" of a node is measured by comparing the received and failed-to-receive counts against a given threshold.
#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait MessageRecvBehaviour<const THRESHOLD: i16>: Measure {
/// This asynchronous method returns a boolean indicating whether the node identified by `did` has a satisfactory message receiving behavior.
async fn good(&self, did: Did) -> bool {
let failed = self.get_count(did, MeasureCounter::FailedToReceive).await;
(failed as i16) < THRESHOLD
}
}
4 changes: 2 additions & 2 deletions core/src/storage/persistence/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl PersistenceStorageOperation for KvStorage {
#[async_trait]
impl<K, V, I> PersistenceStorageReadAndWrite<K, V> for I
where
K: ToString + FromStr + std::marker::Sync + Send,
K: ToString + FromStr + std::marker::Sync + Send + std::fmt::Debug,
V: DeserializeOwned + serde::Serialize + std::marker::Sync + Send,
I: PersistenceStorageOperation + std::marker::Sync + KvStorageBasic,
{
Expand All @@ -152,7 +152,7 @@ where
async fn put(&self, key: &K, value: &V) -> Result<()> {
self.prune().await?;
let data = bincode::serialize(value).map_err(Error::BincodeSerialize)?;
println!("insert v: {:?}", data);
tracing::debug!("Try inserting key: {:?}", key);
self.get_db()
.insert(key.to_string().as_bytes(), data)
.map_err(Error::SledError)?;
Expand Down
Loading