Skip to content

Commit

Permalink
event_graph: ban channels who send > 200 msgs / 1 min
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfi committed Jan 28, 2025
1 parent ab8685d commit 0410ccd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ pub enum Error {
#[error("DAG sync failed")]
DagSyncFailed,

#[error("Malicious flood detected")]
MaliciousFlood,

// =========
// Catch-all
// =========
Expand Down
38 changes: 35 additions & 3 deletions src/event_graph/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
*/

use std::{
collections::{BTreeMap, HashSet},
collections::{BTreeMap, HashSet, VecDeque},
sync::{
atomic::{AtomicUsize, Ordering::SeqCst},
Arc,
Arc, Mutex as SyncMutex,
},
};

Expand All @@ -29,12 +29,17 @@ use log::{debug, error, trace, warn};
use smol::Executor;

use super::{Event, EventGraphPtr, NULL_ID};
use crate::{impl_p2p_message, net::*, Error, Result};
use crate::{impl_p2p_message, net::*, util::time::NanoTimestamp, Error, Result};

/// Malicious behaviour threshold. If the threshold is reached, we will
/// drop the peer from our P2P connection.
const MALICIOUS_THRESHOLD: usize = 5;

/// Global limit of messages per window
const WINDOW_MAXSIZE: usize = 200;
/// Rolling length of the window
const WINDOW_EXPIRY_TIME: NanoTimestamp = NanoTimestamp(60);

/// P2P protocol implementation for the Event Graph.
pub struct ProtocolEventGraph {
/// Pointer to the connected peer
Expand All @@ -55,6 +60,8 @@ pub struct ProtocolEventGraph {
malicious_count: AtomicUsize,
/// P2P jobs manager pointer
jobsman: ProtocolJobsManagerPtr,
/// Rolling window of event timestamps on this channel
bantimes: SyncMutex<VecDeque<NanoTimestamp>>,
}

/// A P2P message representing publishing an event on the network
Expand Down Expand Up @@ -122,6 +129,7 @@ impl ProtocolEventGraph {
_tip_rep_sub,
malicious_count: AtomicUsize::new(0),
jobsman: ProtocolJobsManager::new("ProtocolEventGraph", channel.clone()),
bantimes: SyncMutex::new(VecDeque::new()),
}))
}

Expand Down Expand Up @@ -178,6 +186,30 @@ impl ProtocolEventGraph {
continue
}

// There's a new unique event.
// Apply ban logic to stop network floods.
let is_malicious = {
let mut bantimes = self.bantimes.lock().unwrap();

// Clean out expired timestamps from the window.
while let Some(ts) = bantimes.front() {
if ts.elapsed().unwrap() < WINDOW_EXPIRY_TIME {
break
}
let _ = bantimes.pop_front();
}

// Add new timestamp
bantimes.push_back(NanoTimestamp::current_time());

bantimes.len() > WINDOW_MAXSIZE
};
if is_malicious {
self.channel.ban().await;
// This error is actually unused. We could return Ok here too.
return Err(Error::MaliciousFlood)
}

// We received an event. Check if we already have it in our DAG.
// Check event is not older that current genesis event timestamp.
// Also check if we have the event's parents. In the case we do
Expand Down
20 changes: 18 additions & 2 deletions src/util/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Timestamp {

/// Add `self` to a given timestamp
/// Errors on integer overflow.
pub fn checked_add(&self, ts: Timestamp) -> Result<Self> {
pub fn checked_add(&self, ts: Self) -> Result<Self> {
if let Some(result) = self.inner().checked_add(ts.inner()) {
Ok(Self(result))
} else {
Expand All @@ -78,7 +78,7 @@ impl Timestamp {

/// Subtract `self` with a given timestamp
/// Errors on integer underflow.
pub fn checked_sub(&self, ts: Timestamp) -> Result<Self> {
pub fn checked_sub(&self, ts: Self) -> Result<Self> {
if let Some(result) = self.inner().checked_sub(ts.inner()) {
Ok(Self(result))
} else {
Expand Down Expand Up @@ -108,9 +108,25 @@ impl fmt::Display for Timestamp {
pub struct NanoTimestamp(pub u128);

impl NanoTimestamp {
pub fn inner(&self) -> u128 {
self.0
}

pub fn current_time() -> Self {
Self(UNIX_EPOCH.elapsed().unwrap().as_nanos())
}

pub fn elapsed(&self) -> Result<Self> {
Self::current_time().checked_sub(*self)
}

pub fn checked_sub(&self, ts: Self) -> Result<Self> {
if let Some(result) = self.inner().checked_sub(ts.inner()) {
Ok(Self(result))
} else {
Err(Error::SubtractionUnderflow)
}
}
}
impl fmt::Display for NanoTimestamp {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
Expand Down

0 comments on commit 0410ccd

Please sign in to comment.