Skip to content

Node instance check #56

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 59 additions & 26 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2505,45 +2505,56 @@ impl ClusterInfo {
false
}
};
let mut verify_node_instance = |value: &CrdsValue| {
if self.verify_node_instance(value) {
true
} else {
self.stats.num_unverifed_node_instances.add_relaxed(1);
false
}
};
// Split packets based on their types.
let mut pull_requests = vec![];
let mut pull_responses = vec![];
let mut push_messages = vec![];
let mut prune_messages = vec![];
let mut ping_messages = vec![];
let mut pong_messages = vec![];
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
if verify_gossip_addr(&caller) {
pull_requests.push((from_addr, filter, caller))
{
let _st_vec = ScopedTimer::from(&self.stats.populate_gossip_vectors_time);
for (from_addr, packet) in packets {
match packet {
Protocol::PullRequest(filter, caller) => {
if verify_gossip_addr(&caller) {
pull_requests.push((from_addr, filter, caller))
}
}
}
Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?;
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
pull_responses.append(&mut data);
Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?;
data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance));
if !data.is_empty() {
pull_responses.append(&mut data);
}
}
}
Protocol::PushMessage(from, mut data) => {
check_duplicate_instance(&data)?;
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
push_messages.push((from, data));
Protocol::PushMessage(from, mut data) => {
check_duplicate_instance(&data)?;
data.retain(|value| verify_incoming_crds_value(value, &mut verify_gossip_addr, &mut verify_node_instance));
if !data.is_empty() {
push_messages.push((from, data));
}
}
Protocol::PruneMessage(_from, data) => prune_messages.push(data),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
}
Protocol::PruneMessage(_from, data) => prune_messages.push(data),
Protocol::PingMessage(ping) => ping_messages.push((from_addr, ping)),
Protocol::PongMessage(pong) => pong_messages.push((from_addr, pong)),
}
}
if self.require_stake_for_gossip(stakes) {
retain_staked(&mut pull_responses, stakes);
for (_, data) in &mut push_messages {
retain_staked(data, stakes);
if self.require_stake_for_gossip(stakes) {
retain_staked(&mut pull_responses, stakes);
for (_, data) in &mut push_messages {
retain_staked(data, stakes);
}
push_messages.retain(|(_, data)| !data.is_empty());
}
push_messages.retain(|(_, data)| !data.is_empty());
}
if !pings.is_empty() {
self.stats
Expand Down Expand Up @@ -2578,6 +2589,20 @@ impl ClusterInfo {
Ok(())
}

fn verify_node_instance(&self, value: &CrdsValue) -> bool {
let pubkey = match &value.data {
CrdsData::NodeInstance(node) => node.from(),
_ => return true, // If not a NodeInstance, nothing to verify.
};
// if contact info for the pubkey exists in the crds table, then the
// the contact info has already been verified. Therefore, the node
// instance is valid.
if self.lookup_contact_info(pubkey, |ci| ci.clone()).is_some() {
return true;
}
false
}

// Consumes packets received from the socket, deserializing, sanitizing and
// verifying them and then sending them down the channel for the actual
// handling of requests/messages.
Expand Down Expand Up @@ -3372,6 +3397,14 @@ fn verify_gossip_addr<R: Rng + CryptoRng>(
out
}

fn verify_incoming_crds_value<'a>(
value: &'a CrdsValue,
verify_gossip_addr: &mut impl FnMut(&'a CrdsValue) -> bool,
verify_node_instance: &mut impl FnMut(&'a CrdsValue) -> bool,
) -> bool {
verify_gossip_addr(value) && verify_node_instance(value)
}

#[cfg(test)]
mod tests {
use {
Expand Down
12 changes: 12 additions & 0 deletions gossip/src/cluster_info_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub struct GossipStats {
pub(crate) new_push_requests: Counter,
pub(crate) new_push_requests_num: Counter,
pub(crate) num_unverifed_gossip_addrs: Counter,
pub(crate) num_unverifed_node_instances: Counter,
pub(crate) packets_received_count: Counter,
pub(crate) packets_received_ping_messages_count: Counter,
pub(crate) packets_received_pong_messages_count: Counter,
Expand All @@ -145,6 +146,7 @@ pub struct GossipStats {
pub(crate) packets_sent_pull_responses_count: Counter,
pub(crate) packets_sent_push_messages_count: Counter,
pub(crate) process_gossip_packets_time: Counter,
pub(crate) populate_gossip_vectors_time: Counter,
pub(crate) process_prune: Counter,
pub(crate) process_pull_requests: Counter,
pub(crate) process_pull_response: Counter,
Expand Down Expand Up @@ -239,6 +241,11 @@ pub(crate) fn submit_gossip_stats(
stats.process_gossip_packets_time.clear(),
i64
),
(
"populate_gossip_vectors_time",
stats.populate_gossip_vectors_time.clear(),
i64
),
(
"verify_gossip_packets_time",
stats.verify_gossip_packets_time.clear(),
Expand Down Expand Up @@ -494,6 +501,11 @@ pub(crate) fn submit_gossip_stats(
stats.num_unverifed_gossip_addrs.clear(),
i64
),
(
"num_unverifed_node_instances",
stats.num_unverifed_node_instances.clear(),
i64
),
(
"packets_received_count",
stats.packets_received_count.clear(),
Expand Down
5 changes: 5 additions & 0 deletions gossip/src/crds_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,11 @@ impl NodeInstance {
}
}

#[inline]
pub fn from(&self) -> &Pubkey {
&self.from
}

// Clones the value with an updated wallclock.
pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self {
Self { wallclock, ..*self }
Expand Down