Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gossipsub): remove control pool #559

Merged
merged 8 commits into from
Dec 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 63 additions & 99 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -45,10 +45,6 @@ use libp2p_swarm::{
THandlerOutEvent, ToSwarm,
};

use crate::gossip_promises::GossipPromises;
use crate::handler::{Handler, HandlerEvent, HandlerIn};
use crate::mcache::MessageCache;
use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty};
use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason};
use crate::protocol::SIGNING_PREFIX;
use crate::subscription_filter::{AllowAllSubscriptionFilter, TopicSubscriptionFilter};
@@ -65,6 +61,16 @@ use crate::{
config::{Config, ValidationMode},
types::RpcOut,
};
use crate::{gossip_promises::GossipPromises, types::Graft};
use crate::{
handler::{Handler, HandlerEvent, HandlerIn},
types::Prune,
};
use crate::{mcache::MessageCache, types::IWant};
use crate::{
metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty},
types::IHave,
};
use crate::{rpc_proto::proto, TopicScoreParams};
use crate::{PublishError, SubscriptionError, ValidationError};
use instant::SystemTime;
@@ -247,9 +253,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<ToSwarm<Event, HandlerIn>>,

/// Pools non-urgent control messages between heartbeats.
control_pool: HashMap<PeerId, Vec<ControlAction>>,

/// Information used for publishing messages.
publish_config: PublishConfig,

@@ -317,10 +320,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
/// Counts the number of `IWANT` that we sent the each peer since the last heartbeat.
count_sent_iwant: HashMap<PeerId, usize>,

/// Keeps track of IWANT messages that we are awaiting to send.
/// This is used to prevent sending duplicate IWANT messages for the same message.
pending_iwant_msgs: HashSet<MessageId>,

/// Short term cache for published message ids. This is used for penalizing peers sending
/// our own messages back if the messages are anonymous or use a random author.
published_message_ids: DuplicateCache<MessageId>,
@@ -445,7 +444,6 @@ where
Ok(Behaviour {
metrics: metrics.map(|(registry, cfg)| Metrics::new(registry, cfg)),
events: VecDeque::new(),
control_pool: HashMap::new(),
publish_config: privacy.into(),
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
topic_peers: HashMap::new(),
@@ -471,7 +469,6 @@ where
peer_score: None,
count_received_ihave: HashMap::new(),
count_sent_iwant: HashMap::new(),
pending_iwant_msgs: HashSet::new(),
connected_peers: HashMap::new(),
published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()),
config,
@@ -1027,13 +1024,14 @@ where
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.graft(&peer_id, topic_hash.clone());
}
Self::control_pool_add(
&mut self.control_pool,
peer_id,
ControlAction::Graft {
topic_hash: topic_hash.clone(),
},
);
let sender = self
.handler_send_queues
.get_mut(&peer_id)
.expect("Peerid should exist");

sender.graft(Graft {
topic_hash: topic_hash.clone(),
});

// If the peer did not previously exist in any mesh, inform the handler
peer_added_to_mesh(
@@ -1061,7 +1059,7 @@ where
peer: &PeerId,
do_px: bool,
on_unsubscribe: bool,
) -> ControlAction {
) -> Prune {
if let Some((peer_score, ..)) = &mut self.peer_score {
peer_score.prune(peer, topic_hash.clone());
}
@@ -1072,7 +1070,7 @@ where
}
Some(PeerKind::Gossipsub) => {
// GossipSub v1.0 -- no peer exchange, the peer won't be able to parse it anyway
return ControlAction::Prune {
return Prune {
topic_hash: topic_hash.clone(),
peers: Vec::new(),
backoff: None,
@@ -1109,7 +1107,7 @@ where
// update backoff
self.backoffs.update_backoff(topic_hash, peer, backoff);

ControlAction::Prune {
Prune {
topic_hash: topic_hash.clone(),
peers,
backoff: Some(backoff.as_secs()),
@@ -1129,9 +1127,13 @@ where
// Send a PRUNE control message
tracing::debug!(%peer, "LEAVE: Sending PRUNE to peer");
let on_unsubscribe = true;
let control =
self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
Self::control_pool_add(&mut self.control_pool, peer, control);
let prune = self.make_prune(topic_hash, &peer, self.config.do_px(), on_unsubscribe);
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.prune(prune);

// If the peer did not previously exist in any mesh, inform the handler
peer_removed_from_mesh(
@@ -1230,10 +1232,6 @@ where
return false;
}

if self.pending_iwant_msgs.contains(id) {
return false;
}

self.peer_score
.as_ref()
.map(|(_, _, _, promises)| !promises.contains(id))
@@ -1284,11 +1282,6 @@ where
iwant_ids_vec.truncate(iask);
*iasked += iask;

for message_id in &iwant_ids_vec {
// Add all messages to the pending list
self.pending_iwant_msgs.insert(message_id.clone());
}

if let Some((_, _, _, gossip_promises)) = &mut self.peer_score {
gossip_promises.add_promise(
*peer_id,
@@ -1302,13 +1295,14 @@ where
iwant_ids_vec
);

Self::control_pool_add(
&mut self.control_pool,
*peer_id,
ControlAction::IWant {
message_ids: iwant_ids_vec,
},
);
let sender = self
.handler_send_queues
.get_mut(peer_id)
.expect("Peerid should exist");

sender.iwant(IWant {
message_ids: iwant_ids_vec,
});
}
tracing::trace!(peer=%peer_id, "Completed IHAVE handling for peer");
}
@@ -1512,11 +1506,11 @@ where
.expect("Peerid should exist")
.clone();

for action in to_prune_topics
for prune in to_prune_topics
.iter()
.map(|t| self.make_prune(t, peer_id, do_px, on_unsubscribe))
{
sender.control(action);
sender.prune(prune);
}
// Send the prune messages to the peer
tracing::debug!(
@@ -2016,11 +2010,8 @@ where
.get_mut(propagation_source)
.expect("Peerid should exist");

for action in topics_to_graft
.into_iter()
.map(|topic_hash| ControlAction::Graft { topic_hash })
{
sender.control(action);
for topic_hash in topics_to_graft.into_iter() {
sender.graft(Graft { topic_hash });
}

// Notify the application of the subscriptions
@@ -2438,9 +2429,6 @@ where
self.send_graft_prune(to_graft, to_prune, no_px);
}

// piggyback pooled control messages
self.flush_control_pool();

// shift the memcache
self.mcache.shift();

@@ -2507,14 +2495,14 @@ where
}

// send an IHAVE message
Self::control_pool_add(
&mut self.control_pool,
peer,
ControlAction::IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
},
);
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");
sender.ihave(IHave {
topic_hash: topic_hash.clone(),
message_ids: peer_message_ids,
});
}
}
}
@@ -2546,9 +2534,6 @@ where
&self.connected_peers,
);
}
let control_msgs = topics.iter().map(|topic_hash| ControlAction::Graft {
topic_hash: topic_hash.clone(),
});

// If there are prunes associated with the same peer add them.
// NOTE: In this case a peer has been added to a topic mesh, and removed from another.
@@ -2576,8 +2561,14 @@ where
)
});

for msg in control_msgs.chain(prunes) {
sender.control(msg);
for topic_hash in topics {
sender.graft(Graft {
topic_hash: topic_hash.clone(),
});
}

for prune in prunes {
sender.prune(prune);
}
}

@@ -2597,7 +2588,7 @@ where
.expect("Peerid should exist")
.clone();

sender.control(prune);
sender.prune(prune);

// inform the handler
peer_removed_from_mesh(
@@ -2776,32 +2767,6 @@ where
}
}

// adds a control action to control_pool
fn control_pool_add(
control_pool: &mut HashMap<PeerId, Vec<ControlAction>>,
peer: PeerId,
control: ControlAction,
) {
control_pool.entry(peer).or_default().push(control);
}

/// Takes each control action mapping and turns it into a message
fn flush_control_pool(&mut self) {
for (peer, controls) in self.control_pool.drain().collect::<Vec<_>>() {
for msg in controls {
let sender = self
.handler_send_queues
.get_mut(&peer)
.expect("Peerid should exist");

sender.control(msg);
}
}

// This clears all pending IWANT messages
self.pending_iwant_msgs.clear();
}

fn on_connection_established(
&mut self,
ConnectionEstablished {
@@ -3205,21 +3170,21 @@ where
let mut prune_msgs = vec![];
for control_msg in rpc.control_msgs {
match control_msg {
ControlAction::IHave {
ControlAction::IHave(IHave {
topic_hash,
message_ids,
} => {
}) => {
ihave_msgs.push((topic_hash, message_ids));
}
ControlAction::IWant { message_ids } => {
ControlAction::IWant(IWant { message_ids }) => {
self.handle_iwant(&propagation_source, message_ids)
}
ControlAction::Graft { topic_hash } => graft_msgs.push(topic_hash),
ControlAction::Prune {
ControlAction::Graft(Graft { topic_hash }) => graft_msgs.push(topic_hash),
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
} => prune_msgs.push((topic_hash, peers, backoff)),
}) => prune_msgs.push((topic_hash, peers, backoff)),
}
}
if !ihave_msgs.is_empty() {
@@ -3442,7 +3407,6 @@ impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Behaviour<C, F
f.debug_struct("Behaviour")
.field("config", &self.config)
.field("events", &self.events.len())
.field("control_pool", &self.control_pool)
.field("publish_config", &self.publish_config)
.field("topic_peers", &self.topic_peers)
.field("peer_topics", &self.peer_topics)
385 changes: 190 additions & 195 deletions protocols/gossipsub/src/behaviour/tests.rs

Large diffs are not rendered by default.

43 changes: 25 additions & 18 deletions protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
@@ -23,7 +23,8 @@ use crate::handler::HandlerEvent;
use crate::rpc_proto::proto;
use crate::topic::TopicHash;
use crate::types::{
ControlAction, MessageId, PeerInfo, PeerKind, RawMessage, Rpc, Subscription, SubscriptionAction,
ControlAction, Graft, IHave, IWant, MessageId, PeerInfo, PeerKind, Prune, RawMessage, Rpc,
Subscription, SubscriptionAction,
};
use crate::ValidationError;
use asynchronous_codec::{Decoder, Encoder, Framed};
@@ -413,33 +414,39 @@ impl Decoder for GossipsubCodec {
let ihave_msgs: Vec<ControlAction> = rpc_control
.ihave
.into_iter()
.map(|ihave| ControlAction::IHave {
topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
message_ids: ihave
.message_ids
.into_iter()
.map(MessageId::from)
.collect::<Vec<_>>(),
.map(|ihave| {
ControlAction::IHave(IHave {
topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
message_ids: ihave
.message_ids
.into_iter()
.map(MessageId::from)
.collect::<Vec<_>>(),
})
})
.collect();

let iwant_msgs: Vec<ControlAction> = rpc_control
.iwant
.into_iter()
.map(|iwant| ControlAction::IWant {
message_ids: iwant
.message_ids
.into_iter()
.map(MessageId::from)
.collect::<Vec<_>>(),
.map(|iwant| {
ControlAction::IWant(IWant {
message_ids: iwant
.message_ids
.into_iter()
.map(MessageId::from)
.collect::<Vec<_>>(),
})
})
.collect();

let graft_msgs: Vec<ControlAction> = rpc_control
.graft
.into_iter()
.map(|graft| ControlAction::Graft {
topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
.map(|graft| {
ControlAction::Graft(Graft {
topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
})
})
.collect();

@@ -463,11 +470,11 @@ impl Decoder for GossipsubCodec {
.collect::<Vec<PeerInfo>>();

let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
prune_msgs.push(ControlAction::Prune {
prune_msgs.push(ControlAction::Prune(Prune {
topic_hash,
peers,
backoff: prune.backoff,
});
}));
}

control_msgs.extend(ihave_msgs);
134 changes: 95 additions & 39 deletions protocols/gossipsub/src/types.rs
Original file line number Diff line number Diff line change
@@ -203,8 +203,8 @@ pub enum SubscriptionAction {
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PeerInfo {
pub peer_id: Option<PeerId>,
pub(crate) struct PeerInfo {
pub(crate) peer_id: Option<PeerId>,
//TODO add this when RFC: Signed Address Records got added to the spec (see pull request
// https://github.com/libp2p/specs/pull/217)
//pub signed_peer_record: ?,
@@ -214,31 +214,47 @@ pub struct PeerInfo {
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum ControlAction {
/// Node broadcasts known messages per topic - IHave control message.
IHave {
/// The topic of the messages.
topic_hash: TopicHash,
/// A list of known message ids (peer_id + sequence _number) as a string.
message_ids: Vec<MessageId>,
},
IHave(IHave),
/// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
IWant {
/// A list of known message ids (peer_id + sequence _number) as a string.
message_ids: Vec<MessageId>,
},
IWant(IWant),
/// The node has been added to the mesh - Graft control message.
Graft {
/// The mesh topic the peer should be added to.
topic_hash: TopicHash,
},
Graft(Graft),
/// The node has been removed from the mesh - Prune control message.
Prune {
/// The mesh topic the peer should be removed from.
topic_hash: TopicHash,
/// A list of peers to be proposed to the removed peer as peer exchange
peers: Vec<PeerInfo>,
/// The backoff time in seconds before we allow to reconnect
backoff: Option<u64>,
},
Prune(Prune),
}

/// Node broadcasts known messages per topic - IHave control message.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IHave {
/// The topic of the messages.
pub(crate) topic_hash: TopicHash,
/// A list of known message ids (peer_id + sequence _number) as a string.
pub(crate) message_ids: Vec<MessageId>,
}

/// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct IWant {
/// A list of known message ids (peer_id + sequence _number) as a string.
pub(crate) message_ids: Vec<MessageId>,
}

/// The node has been added to the mesh - Graft control message.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Graft {
/// The mesh topic the peer should be added to.
pub(crate) topic_hash: TopicHash,
}

/// The node has been removed from the mesh - Prune control message.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Prune {
/// The mesh topic the peer should be removed from.
pub(crate) topic_hash: TopicHash,
/// A list of peers to be proposed to the removed peer as peer exchange
pub(crate) peers: Vec<PeerInfo>,
/// The backoff time in seconds before we allow to reconnect
pub(crate) backoff: Option<u64>,
}

/// A Gossipsub RPC message sent.
@@ -254,8 +270,14 @@ pub enum RpcOut {
Subscribe(TopicHash),
/// Unsubscribe a topic.
Unsubscribe(TopicHash),
/// List of Gossipsub control messages.
Control(ControlAction),
/// Send a GRAFT control message.
Graft(Graft),
/// Send a PRUNE control message.
Prune(Prune),
/// Send a IHave control message.
IHave(IHave),
/// Send a IWant control message.
IWant(IWant),
}

impl RpcOut {
@@ -302,7 +324,7 @@ impl From<RpcOut> for proto::RPC {
}],
control: None,
},
RpcOut::Control(ControlAction::IHave {
RpcOut::IHave(IHave {
topic_hash,
message_ids,
}) => proto::RPC {
@@ -318,7 +340,7 @@ impl From<RpcOut> for proto::RPC {
prune: vec![],
}),
},
RpcOut::Control(ControlAction::IWant { message_ids }) => proto::RPC {
RpcOut::IWant(IWant { message_ids }) => proto::RPC {
publish: Vec::new(),
subscriptions: Vec::new(),
control: Some(proto::ControlMessage {
@@ -330,7 +352,7 @@ impl From<RpcOut> for proto::RPC {
prune: vec![],
}),
},
RpcOut::Control(ControlAction::Graft { topic_hash }) => proto::RPC {
RpcOut::Graft(Graft { topic_hash }) => proto::RPC {
publish: Vec::new(),
subscriptions: vec![],
control: Some(proto::ControlMessage {
@@ -342,7 +364,7 @@ impl From<RpcOut> for proto::RPC {
prune: vec![],
}),
},
RpcOut::Control(ControlAction::Prune {
RpcOut::Prune(Prune {
topic_hash,
peers,
backoff,
@@ -434,33 +456,33 @@ impl From<Rpc> for proto::RPC {
for action in rpc.control_msgs {
match action {
// collect all ihave messages
ControlAction::IHave {
ControlAction::IHave(IHave {
topic_hash,
message_ids,
} => {
}) => {
let rpc_ihave = proto::ControlIHave {
topic_id: Some(topic_hash.into_string()),
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.ihave.push(rpc_ihave);
}
ControlAction::IWant { message_ids } => {
ControlAction::IWant(IWant { message_ids }) => {
let rpc_iwant = proto::ControlIWant {
message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
};
control.iwant.push(rpc_iwant);
}
ControlAction::Graft { topic_hash } => {
ControlAction::Graft(Graft { topic_hash }) => {
let rpc_graft = proto::ControlGraft {
topic_id: Some(topic_hash.into_string()),
};
control.graft.push(rpc_graft);
}
ControlAction::Prune {
ControlAction::Prune(Prune {
topic_hash,
peers,
backoff,
} => {
}) => {
let rpc_prune = proto::ControlPrune {
topic_id: Some(topic_hash.into_string()),
peers: peers
@@ -566,14 +588,48 @@ impl RpcSender {
self.receiver.clone()
}

/// Send a `RpcOut::Control` message to the `RpcReceiver`
/// Send a `RpcOut::Graft` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn graft(&mut self, graft: Graft) {
self.priority
.try_send(RpcOut::Graft(graft))
.expect("Channel is unbounded and should always be open");
}

/// Send a `RpcOut::Prune` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn control(&mut self, control: ControlAction) {
pub(crate) fn prune(&mut self, prune: Prune) {
self.priority
.try_send(RpcOut::Control(control))
.try_send(RpcOut::Prune(prune))
.expect("Channel is unbounded and should always be open");
}

/// Send a `RpcOut::IHave` message to the `RpcReceiver`
/// this is low priority and if queue is full the message is dropped.
pub(crate) fn ihave(&mut self, ihave: IHave) {
if let Err(err) = self.non_priority.try_send(RpcOut::IHave(ihave)) {
let rpc = err.into_inner();
tracing::trace!(
"IHAVE message {:?} to peer {} dropped, queue is full",
rpc,
self.peer_id
);
}
}

/// Send a `RpcOut::IHave` message to the `RpcReceiver`
/// this is low priority and if queue is full the message is dropped.
pub(crate) fn iwant(&mut self, iwant: IWant) {
if let Err(err) = self.non_priority.try_send(RpcOut::IWant(iwant)) {
let rpc = err.into_inner();
tracing::trace!(
"IWANT message {:?} to peer {} dropped, queue is full",
rpc,
self.peer_id
);
}
}

/// Send a `RpcOut::Subscribe` message to the `RpcReceiver`
/// this is high priority.
pub(crate) fn subscribe(&mut self, topic: TopicHash) {