Skip to content

Commit

Permalink
reworks gossip pipeline by applying stake-filter earlier
Browse files Browse the repository at this point in the history
Filtering gossip packets based on stake is done very late in the
pipeline both for incoming and outgoing packets:
https://github.com/anza-xyz/agave/blob/4d0fc227d/gossip/src/cluster_info.rs#L1321
https://github.com/anza-xyz/agave/blob/4d0fc227d/gossip/src/cluster_info.rs#L1801
https://github.com/anza-xyz/agave/blob/4d0fc227d/gossip/src/cluster_info.rs#L2222-L2229

In order to avoid wasting resources for packets which are partially
processed but are ultimately dropped anyways, the commit applies
stake-filter earlier in the gossip pipelines.
  • Loading branch information
behzadnouri committed Jan 26, 2025
1 parent 25d4097 commit 700fb70
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 83 deletions.
154 changes: 81 additions & 73 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,39 +168,40 @@ struct PullData {
filter: CrdsFilter,
}

// Retains only CRDS values associated with nodes with enough stake.
// (some crds types are exempted)
fn retain_staked(
values: &mut Vec<CrdsValue>,
// Returns false if the CRDS value should be discarded.
#[inline]
#[must_use]
fn should_retain_crds_value(
value: &CrdsValue,
stakes: &HashMap<Pubkey, u64>,
drop_unstaked_node_instance: bool,
) {
values.retain(|value| {
match value.data() {
CrdsData::ContactInfo(_) => true,
CrdsData::LegacyContactInfo(_) => true,
// May Impact new validators starting up without any stake yet.
CrdsData::Vote(_, _) => true,
// Unstaked nodes can still help repair.
CrdsData::EpochSlots(_, _) => true,
// Unstaked nodes can still serve snapshots.
CrdsData::LegacySnapshotHashes(_) | CrdsData::SnapshotHashes(_) => true,
// Otherwise unstaked voting nodes will show up with no version in
// the various dashboards.
CrdsData::Version(_) => true,
CrdsData::AccountsHashes(_) => true,
CrdsData::NodeInstance(_) if !drop_unstaked_node_instance => true,
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartHeaviestFork(_)
| CrdsData::RestartLastVotedForkSlots(_)
| CrdsData::NodeInstance(_) => {
) -> bool {
match value.data() {
CrdsData::ContactInfo(_) => true,
CrdsData::LegacyContactInfo(_) => true,
// May Impact new validators starting up without any stake yet.
CrdsData::Vote(_, _) => true,
// Unstaked nodes can still help repair.
CrdsData::EpochSlots(_, _) => true,
// Unstaked nodes can still serve snapshots.
CrdsData::LegacySnapshotHashes(_) | CrdsData::SnapshotHashes(_) => true,
// Otherwise unstaked voting nodes will show up with no version in
// the various dashboards.
CrdsData::Version(_) => true,
CrdsData::AccountsHashes(_) => true,
CrdsData::NodeInstance(_) if !drop_unstaked_node_instance => true,
CrdsData::LowestSlot(_, _)
| CrdsData::LegacyVersion(_)
| CrdsData::DuplicateShred(_, _)
| CrdsData::RestartHeaviestFork(_)
| CrdsData::RestartLastVotedForkSlots(_)
| CrdsData::NodeInstance(_) => {
stakes.len() < MIN_NUM_STAKED_NODES || {
let stake = stakes.get(&value.pubkey()).copied();
stake.unwrap_or_default() >= MIN_STAKE_FOR_GOSSIP
}
}
})
}
}

impl ClusterInfo {
Expand Down Expand Up @@ -1307,23 +1308,22 @@ impl ClusterInfo {
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let (mut push_messages, num_entries, num_nodes) = {
let (push_messages, num_entries, num_nodes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
self.flush_push_queue();
self.gossip.new_push_messages(&self_id, timestamp(), stakes)
self.gossip
.new_push_messages(&self_id, timestamp(), stakes, |value| {
should_retain_crds_value(
value, stakes, /*drop_unstaked_node_instance:*/ false,
)
})
};
self.stats
.push_fanout_num_entries
.add_relaxed(num_entries as u64);
self.stats
.push_fanout_num_nodes
.add_relaxed(num_nodes as u64);
if self.require_stake_for_gossip(stakes) {
push_messages.retain(|_, data| {
retain_staked(data, stakes, /* drop_unstaked_node_instance */ false);
!data.is_empty()
})
}
let push_messages: Vec<_> = {
let gossip_crds =
self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2);
Expand Down Expand Up @@ -1783,21 +1783,21 @@ impl ClusterInfo {
};
let now = timestamp();
let self_id = self.id();
let mut pull_responses = {
let pull_responses = {
let _st = ScopedTimer::from(&self.stats.generate_pull_responses);
self.gossip.generate_pull_responses(
thread_pool,
&caller_and_filters,
output_size_limit,
now,
|value| {
should_retain_crds_value(
value, stakes, /*drop_unstaked_node_instance:*/ true,
)
},
&self.stats,
)
};
if self.require_stake_for_gossip(stakes) {
for resp in &mut pull_responses {
retain_staked(resp, stakes, /* drop_unstaked_node_instance */ true);
}
}
let (responses, scores): (Vec<_>, Vec<_>) = addrs
.iter()
.zip(pull_responses)
Expand Down Expand Up @@ -2099,17 +2099,6 @@ impl ClusterInfo {
})
}

fn require_stake_for_gossip(&self, stakes: &HashMap<Pubkey, u64>) -> bool {
if stakes.len() < MIN_NUM_STAKED_NODES {
self.stats
.require_stake_for_gossip_unknown_stakes
.add_relaxed(1);
false
} else {
true
}
}

fn process_packets(
&self,
packets: VecDeque<(/*from:*/ SocketAddr, Protocol)>,
Expand Down Expand Up @@ -2214,17 +2203,6 @@ impl ClusterInfo {
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
}
}
if self.require_stake_for_gossip(stakes) {
retain_staked(
&mut pull_responses,
stakes,
/* drop_unstaked_node_instance */ false,
);
for (_, data) in &mut push_messages {
retain_staked(data, stakes, /* drop_unstaked_node_instance */ false);
}
push_messages.retain(|(_, data)| !data.is_empty());
}
if !pings.is_empty() {
self.stats
.packets_sent_gossip_requests_count
Expand Down Expand Up @@ -2263,9 +2241,10 @@ impl ClusterInfo {
// handling of requests/messages.
fn run_socket_consume(
&self,
thread_pool: &ThreadPool,
epoch_specs: Option<&mut EpochSpecs>,
receiver: &PacketBatchReceiver,
sender: &Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>,
thread_pool: &ThreadPool,
) -> Result<(), GossipError> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
fn count_dropped_packets(packets: &PacketBatch, dropped_packets_counts: &mut [u64; 7]) {
Expand Down Expand Up @@ -2302,28 +2281,48 @@ impl ClusterInfo {
self.stats
.packets_received_count
.add_relaxed(num_packets as u64 + num_packets_dropped);
fn verify_packet(packet: &Packet, stats: &GossipStats) -> Option<(SocketAddr, Protocol)> {
let protocol: Protocol =
fn verify_packet(
packet: &Packet,
stakes: &HashMap<Pubkey, u64>,
stats: &GossipStats,
) -> Option<(SocketAddr, Protocol)> {
let mut protocol: Protocol =
stats.record_received_packet(packet.deserialize_slice::<Protocol, _>(..))?;
protocol.sanitize().ok()?;
if let Protocol::PullResponse(_, values) | Protocol::PushMessage(_, values) =
&mut protocol
{
values.retain(|value| {
should_retain_crds_value(
value, stakes, /*drop_unstaked_node_instance:*/ false,
)
});
if values.is_empty() {
return None;
}
}
protocol.par_verify().then(|| {
stats.packets_received_verified_count.add_relaxed(1);
(packet.meta().socket_addr(), protocol)
})
}
let stakes = epoch_specs
.map(EpochSpecs::current_epoch_staked_nodes)
.cloned()
.unwrap_or_default();
let packets: Vec<_> = {
let _st = ScopedTimer::from(&self.stats.verify_gossip_packets_time);
thread_pool.install(|| {
if packets.len() == 1 {
packets[0]
.par_iter()
.filter_map(|packet| verify_packet(packet, &self.stats))
.filter_map(|packet| verify_packet(packet, &stakes, &self.stats))
.collect()
} else {
packets
.par_iter()
.flatten()
.filter_map(|packet| verify_packet(packet, &self.stats))
.filter_map(|packet| verify_packet(packet, &stakes, &self.stats))
.collect()
}
})
Expand Down Expand Up @@ -2385,6 +2384,7 @@ impl ClusterInfo {

pub(crate) fn start_socket_consume_thread(
self: Arc<Self>,
bank_forks: Option<Arc<RwLock<BankForks>>>,
receiver: PacketBatchReceiver,
sender: Sender<Vec<(/*from:*/ SocketAddr, Protocol)>>,
exit: Arc<AtomicBool>,
Expand All @@ -2394,9 +2394,15 @@ impl ClusterInfo {
.thread_name(|i| format!("solGossipCons{i:02}"))
.build()
.unwrap();
let mut epoch_specs = bank_forks.map(EpochSpecs::from);
let run_consume = move || {
while !exit.load(Ordering::Relaxed) {
match self.run_socket_consume(&receiver, &sender, &thread_pool) {
match self.run_socket_consume(
&thread_pool,
epoch_specs.as_mut(),
&receiver,
&sender,
) {
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Disconnected)) => break,
Err(GossipError::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
// A send operation can only fail if the receiving end of a
Expand Down Expand Up @@ -3468,10 +3474,12 @@ mod tests {
);
//check that all types of gossip messages are signed correctly
cluster_info.flush_push_queue();
let (push_messages, _, _) =
cluster_info
.gossip
.new_push_messages(&cluster_info.id(), timestamp(), &stakes);
let (push_messages, _, _) = cluster_info.gossip.new_push_messages(
&cluster_info.id(),
timestamp(),
&stakes,
|_| true, // should_retain_crds_value
);
// there should be some pushes ready
assert!(!push_messages.is_empty());
push_messages
Expand Down
6 changes: 0 additions & 6 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ pub struct GossipStats {
pub(crate) push_response_count: Counter,
pub(crate) push_vote_read: Counter,
pub(crate) repair_peers: Counter,
pub(crate) require_stake_for_gossip_unknown_stakes: Counter,
pub(crate) save_contact_info_time: Counter,
pub(crate) skip_pull_response_shred_version: Counter,
pub(crate) skip_pull_shred_version: Counter,
Expand Down Expand Up @@ -602,11 +601,6 @@ pub(crate) fn submit_gossip_stats(
stats.packets_sent_push_messages_count.clear(),
i64
),
(
"require_stake_for_gossip_unknown_stakes",
stats.require_stake_for_gossip_unknown_stakes.clear(),
i64
),
("trim_crds_table", stats.trim_crds_table.clear(), i64),
(
"trim_crds_table_failed",
Expand Down
6 changes: 5 additions & 1 deletion gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,14 @@ impl CrdsGossip {
pubkey: &Pubkey, // This node.
now: u64,
stakes: &HashMap<Pubkey, u64>,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool,
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
usize, // number of push messages
) {
self.push.new_push_messages(pubkey, &self.crds, now, stakes)
self.push
.new_push_messages(pubkey, &self.crds, now, stakes, should_retain_crds_value)
}

pub(crate) fn push_duplicate_shred<F>(
Expand Down Expand Up @@ -231,6 +233,7 @@ impl CrdsGossip {
filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> {
CrdsGossipPull::generate_pull_responses(
Expand All @@ -239,6 +242,7 @@ impl CrdsGossip {
filters,
output_size_limit,
now,
should_retain_crds_value,
stats,
)
}
Expand Down
18 changes: 17 additions & 1 deletion gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,9 +301,18 @@ impl CrdsGossipPull {
requests: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> {
Self::filter_crds_values(thread_pool, crds, requests, output_size_limit, now, stats)
Self::filter_crds_values(
thread_pool,
crds,
requests,
output_size_limit,
now,
should_retain_crds_value,
stats,
)
}

// Checks if responses should be inserted and
Expand Down Expand Up @@ -448,6 +457,8 @@ impl CrdsGossipPull {
filters: &[(CrdsValue, CrdsFilter)],
output_size_limit: usize, // Limit number of crds values returned.
now: u64,
// Predicate returning false if the CRDS value should be discarded.
should_retain_crds_value: impl Fn(&CrdsValue) -> bool + Sync,
stats: &GossipStats,
) -> Vec<Vec<CrdsValue>> {
let msg_timeout = CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
Expand Down Expand Up @@ -478,6 +489,7 @@ impl CrdsGossipPull {
false
} else {
!filter.filter_contains(entry.value.hash())
&& should_retain_crds_value(&entry.value)
}
};
let out: Vec<_> = crds
Expand Down Expand Up @@ -1054,6 +1066,7 @@ pub(crate) mod tests {
&filters,
usize::MAX, // output_size_limit
now,
|_| true, // should_retain_crds_value
&GossipStats::default(),
);

Expand All @@ -1077,6 +1090,7 @@ pub(crate) mod tests {
&filters,
usize::MAX, // output_size_limit
now,
|_| true, // should_retain_crds_value
&GossipStats::default(),
);
assert_eq!(rsp[0].len(), 0);
Expand All @@ -1097,6 +1111,7 @@ pub(crate) mod tests {
&filters,
usize::MAX, // output_size_limit
now,
|_| true, // should_retain_crds_value
&GossipStats::default(),
);
assert_eq!(rsp.len(), 2 * MIN_NUM_BLOOM_FILTERS);
Expand Down Expand Up @@ -1182,6 +1197,7 @@ pub(crate) mod tests {
&filters,
usize::MAX, // output_size_limit
0, // now
|_| true, // should_retain_crds_value
&GossipStats::default(),
);
// if there is a false positive this is empty
Expand Down
Loading

0 comments on commit 700fb70

Please sign in to comment.