Skip to content

Commit

Permalink
net: fix race condition provoked by channel.ban()
Browse files Browse the repository at this point in the history
If a node we are connected to as part of a refinery session sends
arbitrary messages, it could trigger a "ban", which would result in a
panic if the refinery was moving the peer to whitelist at the same time
we are sending the node to the blacklist.

Following this commit, we ignore messages without dispatchers if they
come from a refine session.
  • Loading branch information
darkfi committed Jan 28, 2025
1 parent 14e60de commit 9b3c4b3
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 16 deletions.
38 changes: 28 additions & 10 deletions src/net/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,18 +381,36 @@ impl Channel {
// Send result to our publishers
match self.message_subsystem.notify(&command, reader).await {
Ok(()) => {}
// If we're getting messages without dispatchers, it's spam.
Err(Error::MissingDispatcher) => {
warn!(
target: "net::channel::main_receive_loop()",
"MissingDispatcher for command={}, channel={:?}",
command, self
);
if let BanPolicy::Strict = self.p2p().settings().read().await.ban_policy {
self.ban(self.address()).await;
}
// If we're getting messages without dispatchers, it's spam.
// We therefore ban this channel if:
//
// 1) This channel is NOT part of a refine session.
//
// It's possible that nodes can send messages without
// dispatchers during the refinery process. If that happens
// we simply ignore it. Otherwise, it's spam.
//
// 2) BanPolicy is set to Strict.
//
// We only ban if the BanPolicy is set to Strict, which is
// the default setting for most nodes. The exception to
// this is a seed node like Lilith which has BanPolicy::Relaxed
// since it regularly forms connections with nodes sending
// messages it does not have dispatchers for.
if self.session.upgrade().unwrap().type_id() != SESSION_REFINE {
warn!(
target: "net::channel::main_receive_loop()",
"MissingDispatcher for command={}, channel={:?}",
command, self
);

return Err(Error::ChannelStopped)
if let BanPolicy::Strict = self.p2p().settings().read().await.ban_policy {
self.ban(self.address()).await;
}

return Err(Error::ChannelStopped)
}
}
Err(_) => unreachable!("You added a new error in notify()"),
}
Expand Down
7 changes: 1 addition & 6 deletions src/net/message_publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{any::Any, collections::HashMap, sync::Arc, time::Duration};

use async_trait::async_trait;
use futures::stream::{FuturesUnordered, StreamExt};
use log::{debug, error, warn};
use log::{debug, error};
use rand::{rngs::OsRng, Rng};
use smol::{io::AsyncReadExt, lock::Mutex};

Expand Down Expand Up @@ -290,11 +290,6 @@ impl MessageSubsystem {
reader: &mut smol::io::ReadHalf<Box<dyn PtStream + 'static>>,
) -> Result<()> {
let Some(dispatcher) = self.dispatchers.lock().await.get(command).cloned() else {
warn!(
target: "net::message_publisher::notify",
"message_publisher::notify: Command '{}' did not find a dispatcher",
command,
);
return Err(Error::MissingDispatcher)
};

Expand Down

0 comments on commit 9b3c4b3

Please sign in to comment.