From c1d3ad723f34c24141211570f8dfdd77fc0628b7 Mon Sep 17 00:00:00 2001 From: greg Date: Fri, 2 Aug 2024 17:49:02 +0000 Subject: [PATCH 1/3] verify node instance before accepting --- gossip/src/cluster_info.rs | 24 ++++++++++++++++++++++++ gossip/src/cluster_info_metrics.rs | 6 ++++++ gossip/src/crds_value.rs | 5 +++++ 3 files changed, 35 insertions(+) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 0f11489333644d..eb5530f9e6affa 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2505,6 +2505,14 @@ impl ClusterInfo { false } }; + let mut verify_node_instance = |value: &CrdsValue| { + if self.verify_node_instance(value) { + true + } else { + self.stats.num_unverifed_node_instances.add_relaxed(1); + false + } + }; // Split packets based on their types. let mut pull_requests = vec![]; let mut pull_responses = vec![]; @@ -2522,6 +2530,7 @@ impl ClusterInfo { Protocol::PullResponse(_, mut data) => { check_duplicate_instance(&data)?; data.retain(&mut verify_gossip_addr); + data.retain(&mut verify_node_instance); if !data.is_empty() { pull_responses.append(&mut data); } @@ -2529,6 +2538,7 @@ impl ClusterInfo { Protocol::PushMessage(from, mut data) => { check_duplicate_instance(&data)?; data.retain(&mut verify_gossip_addr); + data.retain(&mut verify_node_instance); if !data.is_empty() { push_messages.push((from, data)); } @@ -2578,6 +2588,20 @@ impl ClusterInfo { Ok(()) } + fn verify_node_instance(&self, value: &CrdsValue) -> bool { + let pubkey = match &value.data { + CrdsData::NodeInstance(node) => node.from(), + _ => return true, // If not a NodeInstance, nothing to verify. + }; + // if contact info for the pubkey exists in the crds table, then the + // the contact info has already been verified. Therefore, the node + // instance is valid. + if self.lookup_contact_info(pubkey, |ci| ci.clone()).is_some() { + return true; + } + false + } + // Consumes packets received from the socket, deserializing, sanitizing and // verifying them and then sending them down the channel for the actual // handling of requests/messages. diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index a902130ac2b56e..02eafeeeeb6cf3 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -130,6 +130,7 @@ pub struct GossipStats { pub(crate) new_push_requests: Counter, pub(crate) new_push_requests_num: Counter, pub(crate) num_unverifed_gossip_addrs: Counter, + pub(crate) num_unverifed_node_instances: Counter, pub(crate) packets_received_count: Counter, pub(crate) packets_received_ping_messages_count: Counter, pub(crate) packets_received_pong_messages_count: Counter, @@ -494,6 +495,11 @@ pub(crate) fn submit_gossip_stats( stats.num_unverifed_gossip_addrs.clear(), i64 ), + ( + "num_unverifed_node_instances", + stats.num_unverifed_node_instances.clear(), + i64 + ), ( "packets_received_count", stats.packets_received_count.clear(), diff --git a/gossip/src/crds_value.rs b/gossip/src/crds_value.rs index 50061fe4df2c9c..a39fa8dfbbf985 100644 --- a/gossip/src/crds_value.rs +++ b/gossip/src/crds_value.rs @@ -455,6 +455,11 @@ impl NodeInstance { } } + #[inline] + pub fn from(&self) -> &Pubkey { + &self.from + } + // Clones the value with an updated wallclock. pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self { Self { wallclock, ..*self } From 4c079c250d899249528b46cfaea41a940e2e23c4 Mon Sep 17 00:00:00 2001 From: greg Date: Tue, 6 Aug 2024 20:20:05 +0000 Subject: [PATCH 2/3] add verify_incoming_crds_value method --- gossip/src/cluster_info.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index eb5530f9e6affa..36ed72ddee6f43 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2529,16 +2529,14 @@ impl ClusterInfo { } Protocol::PullResponse(_, mut data) => { check_duplicate_instance(&data)?; - data.retain(&mut verify_gossip_addr); - data.retain(&mut verify_node_instance); + data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance)); if !data.is_empty() { pull_responses.append(&mut data); } } Protocol::PushMessage(from, mut data) => { check_duplicate_instance(&data)?; - data.retain(&mut verify_gossip_addr); - data.retain(&mut verify_node_instance); + data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance)); if !data.is_empty() { push_messages.push((from, data)); } @@ -3396,6 +3394,14 @@ fn verify_gossip_addr( out } +fn verify_incoming_crds_value<'a>( + value: &'a CrdsValue, + verify_gossip_addr: &mut impl FnMut(&'a CrdsValue) -> bool, + verify_node_instance: &mut impl FnMut(&'a CrdsValue) -> bool, +) -> bool { + verify_gossip_addr(value) && verify_node_instance(value) +} + #[cfg(test)] mod tests { use { From 5162e9b6a3d1ed0e5268c305d5f6ab3c3672fc45 Mon Sep 17 00:00:00 2001 From: greg Date: Thu, 8 Aug 2024 19:06:19 +0000 Subject: [PATCH 3/3] add timer measuring time to populate gossip message vectors --- gossip/src/cluster_info.rs | 55 ++++++++++++++++-------------- gossip/src/cluster_info_metrics.rs | 6 ++++ 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 36ed72ddee6f43..de89048412d2de 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -2520,38 +2520,41 @@ impl ClusterInfo { let mut prune_messages = vec![]; let mut ping_messages = vec![]; let mut pong_messages = vec![]; - for (from_addr, packet) in packets { - match packet { - Protocol::PullRequest(filter, caller) => { - if verify_gossip_addr(&caller) { - pull_requests.push((from_addr, filter, caller)) + { + let _st_vec = ScopedTimer::from(&self.stats.populate_gossip_vectors_time); + for (from_addr, packet) in packets { + match packet { + Protocol::PullRequest(filter, caller) => { + if verify_gossip_addr(&caller) { + pull_requests.push((from_addr, filter, caller)) + } } - } - Protocol::PullResponse(_, mut data) => { - check_duplicate_instance(&data)?; - data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance)); - if !data.is_empty() { - pull_responses.append(&mut data); + Protocol::PullResponse(_, mut data) => { + check_duplicate_instance(&data)?; + data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance)); + if !data.is_empty() { + pull_responses.append(&mut data); + } } - } - Protocol::PushMessage(from, mut data) => { - check_duplicate_instance(&data)?; - data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance)); - if !data.is_empty() { - push_messages.push((from, data)); + Protocol::PushMessage(from, mut data) => { + check_duplicate_instance(&data)?; + data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance)); + if !data.is_empty() { + push_messages.push((from, data)); + } } + Protocol::PruneMessage(_from, data) => prune_messages.push(data), + Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)), + Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)), } - Protocol::PruneMessage(_from, data) => prune_messages.push(data), - Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)), - Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)), } - } - if self.require_stake_for_gossip(stakes) { - retain_staked(&mut pull_responses, stakes); - for (_, data) in &mut push_messages { - retain_staked(data, stakes); + if self.require_stake_for_gossip(stakes) { + retain_staked(&mut pull_responses, stakes); + for (_, data) in &mut push_messages { + retain_staked(data, stakes); + } + push_messages.retain(|(_, data)| !data.is_empty()); } - push_messages.retain(|(_, data)| !data.is_empty()); } if !pings.is_empty() { self.stats diff --git a/gossip/src/cluster_info_metrics.rs b/gossip/src/cluster_info_metrics.rs index 02eafeeeeb6cf3..252ad856c1297f 100644 --- a/gossip/src/cluster_info_metrics.rs +++ b/gossip/src/cluster_info_metrics.rs @@ -146,6 +146,7 @@ pub struct GossipStats { pub(crate) packets_sent_pull_responses_count: Counter, pub(crate) packets_sent_push_messages_count: Counter, pub(crate) process_gossip_packets_time: Counter, + pub(crate) populate_gossip_vectors_time: Counter, pub(crate) process_prune: Counter, pub(crate) process_pull_requests: Counter, pub(crate) process_pull_response: Counter, @@ -240,6 +241,11 @@ pub(crate) fn submit_gossip_stats( stats.process_gossip_packets_time.clear(), i64 ), + ( + "populate_gossip_vectors_time", + stats.populate_gossip_vectors_time.clear(), + i64 + ), ( "verify_gossip_packets_time", stats.verify_gossip_packets_time.clear(),