Skip to content

Commit

Permalink
refactor(network): blacklist duration depends on reputation modifier …
Browse files Browse the repository at this point in the history
…(malicious / unstable) (#587)

* refactor(network): blacklist duration depends on reputation modifier (malicious / unstable)

* refactor(network): fix CR comments
  • Loading branch information
AlonLStarkWare authored Sep 8, 2024
1 parent 10605d3 commit d7fd3d7
Show file tree
Hide file tree
Showing 7 changed files with 126 additions and 165 deletions.
5 changes: 3 additions & 2 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
Some((topic_hash, message)) = self.messages_to_broadcast_receivers.next() => {
self.broadcast_message(message, topic_hash);
}
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer(peer_id),
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer(peer_id),
Some(Some(peer_id)) = self.reported_peer_receivers.next() => self.swarm.report_peer_as_malicious(peer_id),
Some(peer_id) = self.reported_peers_receiver.next() => self.swarm.report_peer_as_malicious(peer_id),
Some(broadcasted_message_manager) = self.continue_propagation_receiver.next() => {
self.swarm.continue_propagation(broadcasted_message_manager);
}
Expand Down Expand Up @@ -721,6 +721,7 @@ pub struct ClientResponsesManager<Response: TryFrom<Bytes>> {
}

impl<Response: TryFrom<Bytes>> ClientResponsesManager<Response> {
/// Use this function to report peer as malicious
pub fn report_peer(self) {
warn!("Reporting peer");
if let Err(e) = self.report_sender.send(()) {
Expand Down
7 changes: 4 additions & 3 deletions crates/papyrus_network/src/network_manager/swarm_trait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub trait SwarmTrait: Stream<Item = Event> + Unpin {

fn broadcast_message(&mut self, message: Bytes, topic_hash: TopicHash);

fn report_peer(&mut self, peer_id: PeerId);
fn report_peer_as_malicious(&mut self, peer_id: PeerId);

fn add_new_supported_inbound_protocol(&mut self, protocol_name: StreamProtocol);

Expand Down Expand Up @@ -125,8 +125,9 @@ impl SwarmTrait for Swarm<mixed_behaviour::MixedBehaviour> {
}
}

fn report_peer(&mut self, peer_id: PeerId) {
let _ = self.behaviour_mut().peer_manager.report_peer(peer_id, ReputationModifier::Bad {});
fn report_peer_as_malicious(&mut self, peer_id: PeerId) {
let _ =
self.behaviour_mut().peer_manager.report_peer(peer_id, ReputationModifier::Malicious);
}

fn add_new_supported_inbound_protocol(&mut self, protocol: StreamProtocol) {
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/network_manager/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl SwarmTrait for MockSwarm {
}
}

fn report_peer(&mut self, peer_id: PeerId) {
fn report_peer_as_malicious(&mut self, peer_id: PeerId) {
for sender in &self.reported_peer_senders {
sender.unbounded_send(peer_id).unwrap();
}
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_network/src/peer_manager/behaviour_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ where
);
return;
}
let res = self.report_peer(peer_id, super::ReputationModifier::Bad);
let res = self.report_peer(peer_id, super::ReputationModifier::Unstable);
if res.is_err() {
error!("Dial failure of an unknown peer. peer id: {}", peer_id)
}
Expand Down
24 changes: 16 additions & 8 deletions crates/papyrus_network/src/peer_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ pub(crate) mod peer;
mod test;

#[cfg_attr(test, derive(Debug, PartialEq))]
#[derive(Clone, Copy)]
pub enum ReputationModifier {
// TODO: Implement this enum
Bad,
Malicious,
Unstable,
}

pub struct PeerManager<P: PeerTrait + 'static> {
Expand All @@ -41,7 +42,8 @@ pub struct PeerManager<P: PeerTrait + 'static> {

#[derive(Clone)]
pub struct PeerManagerConfig {
blacklist_timeout: Duration,
malicious_timeout: Duration,
unstable_timeout: Duration,
}

#[derive(thiserror::Error, Debug)]
Expand All @@ -58,7 +60,8 @@ impl Default for PeerManagerConfig {
fn default() -> Self {
Self {
// 1 year.
blacklist_timeout: Duration::from_secs(3600 * 24 * 365),
malicious_timeout: Duration::from_secs(3600 * 24 * 365),
unstable_timeout: Duration::from_secs(1),
}
}
}
Expand All @@ -82,9 +85,8 @@ where
}
}

fn add_peer(&mut self, mut peer: P) {
fn add_peer(&mut self, peer: P) {
info!("Peer Manager found new peer {:?}", peer.peer_id());
peer.set_timeout_duration(self.config.blacklist_timeout);
self.peers.insert(peer.peer_id(), peer);
// The new peer is unblocked so we don't need to wait for unblocked peer.
self.sleep_waiting_for_unblocked_peer = None;
Expand Down Expand Up @@ -184,7 +186,10 @@ where
self.pending_events
.push(ToSwarm::GenerateEvent(ToOtherBehaviourEvent::PeerBlacklisted { peer_id }));
if let Some(peer) = self.peers.get_mut(&peer_id) {
peer.update_reputation(reason);
peer.update_reputation(match reason {
ReputationModifier::Malicious => self.config.malicious_timeout,
ReputationModifier::Unstable => self.config.unstable_timeout,
});
Ok(())
} else {
Err(PeerManagerError::NoSuchPeer(peer_id))
Expand All @@ -198,7 +203,10 @@ where
) -> Result<(), PeerManagerError> {
if let Some(peer_id) = self.session_to_peer_map.get(&outbound_session_id) {
if let Some(peer) = self.peers.get_mut(peer_id) {
peer.update_reputation(reason);
peer.update_reputation(match reason {
ReputationModifier::Malicious => self.config.malicious_timeout,
ReputationModifier::Unstable => self.config.unstable_timeout,
});
Ok(())
} else {
Err(PeerManagerError::NoSuchPeer(*peer_id))
Expand Down
70 changes: 28 additions & 42 deletions crates/papyrus_network/src/peer_manager/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,17 @@ use std::time::{Duration, Instant};

use libp2p::swarm::ConnectionId;
use libp2p::{Multiaddr, PeerId};
#[cfg(test)]
use mockall::automock;
use tracing::{error, info};

use super::ReputationModifier;
use tracing::info;

#[cfg_attr(test, automock)]
pub trait PeerTrait {
fn new(peer_id: PeerId, multiaddr: Multiaddr) -> Self;

fn update_reputation(&mut self, reason: ReputationModifier);
fn update_reputation(&mut self, timeout_duration: Duration);

fn peer_id(&self) -> PeerId;

fn multiaddr(&self) -> Multiaddr;

fn set_timeout_duration(&mut self, duration: Duration);

fn is_blocked(&self) -> bool;

/// Returns Instant::now if not blocked.
Expand All @@ -36,36 +29,22 @@ pub trait PeerTrait {
pub struct Peer {
peer_id: PeerId,
multiaddr: Multiaddr,
timed_out_until: Option<Instant>,
timeout_duration: Option<Duration>,
timed_out_until: Instant,
connection_ids: Vec<ConnectionId>,
}

impl PeerTrait for Peer {
fn new(peer_id: PeerId, multiaddr: Multiaddr) -> Self {
Self {
peer_id,
multiaddr,
timeout_duration: None,
timed_out_until: None,
connection_ids: Vec::new(),
}
Self { peer_id, multiaddr, timed_out_until: get_instant_now(), connection_ids: Vec::new() }
}

fn update_reputation(&mut self, _reason: ReputationModifier) {
if let Some(timeout_duration) = self.timeout_duration {
info!(
"Peer {:?} misbehaved. Blacklisting it for {:.3} seconds.",
self.peer_id,
timeout_duration.as_secs_f64(),
);
self.timed_out_until = Some(Instant::now() + timeout_duration);
} else {
error!(
"Peer {:?} misbehaved but its timeout duration wasn't set. Not doing anything.",
self.peer_id
);
}
fn update_reputation(&mut self, timeout_duration: Duration) {
self.timed_out_until = get_instant_now() + timeout_duration;
info!(
"Peer {:?} misbehaved. Blacklisting it for {:.3} seconds.",
self.peer_id,
timeout_duration.as_secs_f64(),
);
}

fn peer_id(&self) -> PeerId {
Expand All @@ -76,20 +55,16 @@ impl PeerTrait for Peer {
self.multiaddr.clone()
}

fn set_timeout_duration(&mut self, duration: Duration) {
self.timeout_duration = Some(duration);
}

fn is_blocked(&self) -> bool {
if let Some(timed_out_until) = self.timed_out_until {
timed_out_until > Instant::now()
} else {
false
}
self.timed_out_until > get_instant_now()
}

fn blocked_until(&self) -> Instant {
self.timed_out_until.unwrap_or_else(Instant::now)
if self.timed_out_until > get_instant_now() {
self.timed_out_until
} else {
get_instant_now()
}
}

fn connection_ids(&self) -> &Vec<ConnectionId> {
Expand All @@ -104,3 +79,14 @@ impl PeerTrait for Peer {
self.connection_ids.retain(|&id| id != connection_id);
}
}

#[cfg(not(test))]
fn get_instant_now() -> Instant {
Instant::now()
}

// In tests we simulate time passing using tokio, so we need to use tokio's Instant instead of std.
#[cfg(test)]
fn get_instant_now() -> Instant {
tokio::time::Instant::now().into_std()
}
Loading

0 comments on commit d7fd3d7

Please sign in to comment.