Skip to content

Commit

Permalink
minimizes CrdsValue cloning during gossip push while holding CRDS tab…
Browse files Browse the repository at this point in the history
…le lock (#4635)

In CrdsGossipPush::new_push_messages a CrdsValue is repeatedly cloned
for each node it is pushed to while holding a lock on the CRDS table:
https://github.com/anza-xyz/agave/blob/85e8f86f9/gossip/src/crds_gossip_push.rs#L203

This commit instead populates a vector of _unique_ entries pushed out
and uses indices into this vector. Doing so there will be only one clone
if the value is pushed to any number of nodes.
  • Loading branch information
behzadnouri authored Jan 28, 2025
1 parent c1fb66b commit e208c63
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 83 deletions.
16 changes: 12 additions & 4 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ impl ClusterInfo {
}
fn new_push_requests(&self, stakes: &HashMap<Pubkey, u64>) -> Vec<(SocketAddr, Protocol)> {
let self_id = self.id();
let (push_messages, num_entries, num_nodes) = {
let (entries, push_messages, num_pushes) = {
let _st = ScopedTimer::from(&self.stats.new_push_requests);
self.flush_push_queue();
self.gossip
Expand All @@ -1320,10 +1320,10 @@ impl ClusterInfo {
};
self.stats
.push_fanout_num_entries
.add_relaxed(num_entries as u64);
.add_relaxed(entries.len() as u64);
self.stats
.push_fanout_num_nodes
.add_relaxed(num_nodes as u64);
.add_relaxed(num_pushes as u64);
let push_messages: Vec<_> = {
let gossip_crds =
self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2);
Expand All @@ -1338,6 +1338,7 @@ impl ClusterInfo {
let messages: Vec<_> = push_messages
.into_iter()
.flat_map(|(peer, msgs)| {
let msgs = msgs.into_iter().map(|k| entries[k].clone());
split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs)
.map(move |payload| (peer, Protocol::PushMessage(self_id, payload)))
})
Expand Down Expand Up @@ -3474,12 +3475,19 @@ mod tests {
);
//check that all types of gossip messages are signed correctly
cluster_info.flush_push_queue();
let (push_messages, _, _) = cluster_info.gossip.new_push_messages(
let (entries, push_messages, _) = cluster_info.gossip.new_push_messages(
&cluster_info.id(),
timestamp(),
&stakes,
|_| true, // should_retain_crds_value
);
let push_messages = push_messages
.into_iter()
.map(|(pubkey, indices)| {
let values = indices.into_iter().map(|k| entries[k].clone()).collect();
(pubkey, values)
})
.collect::<HashMap<_, Vec<_>>>();
// there should be some pushes ready
assert!(!push_messages.is_empty());
push_messages
Expand Down
5 changes: 3 additions & 2 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ impl CrdsGossip {
stakes: &HashMap<Pubkey, u64>,
should_retain_crds_value: impl Fn(&CrdsValue) -> bool,
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
Vec<CrdsValue>, // unique CrdsValues pushed out to peers
// map of pubkeys to indices in Vec<CrdsValue> pushed to that peer
HashMap<Pubkey, Vec</*index:*/ usize>>,
usize, // number of push messages
) {
self.push
Expand Down
88 changes: 57 additions & 31 deletions gossip/src/crds_gossip_push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,16 +172,17 @@ impl CrdsGossipPush {
// Predicate returning false if the CRDS value should be discarded.
should_retain_crds_value: impl Fn(&CrdsValue) -> bool,
) -> (
HashMap<Pubkey, Vec<CrdsValue>>,
usize, // number of values
Vec<CrdsValue>, // unique CrdsValues pushed out to peers
// map of pubkeys to indices in Vec<CrdsValue> pushed to that peer
HashMap<Pubkey, Vec</*index:*/ usize>>,
usize, // number of push messages
) {
const MAX_NUM_PUSHES: usize = 1 << 12;
let active_set = self.active_set.read().unwrap();
let mut num_pushes = 0;
let mut num_values = 0;
let mut push_messages: HashMap<Pubkey, Vec<CrdsValue>> = HashMap::new();
let mut values = Vec::new();
let mut push_messages = HashMap::<Pubkey, Vec</*index:*/ usize>>::new();
let wallclock_window = self.wallclock_window(now);
let active_set = self.active_set.read().unwrap();
let mut crds_cursor = self.crds_cursor.lock().unwrap();
// crds should be locked last after self.{active_set,crds_cursor}.
let crds = crds.read().unwrap();
Expand All @@ -191,16 +192,22 @@ impl CrdsGossipPush {
.filter(|value| wallclock_window.contains(&value.wallclock()))
.filter(|value| should_retain_crds_value(value));
'outer: for value in entries {
num_values += 1;
let origin = value.pubkey();
let nodes = active_set.get_nodes(
pubkey,
&origin,
|node| value.should_force_push(node),
stakes,
);
for node in nodes.take(self.push_fanout) {
push_messages.entry(*node).or_default().push(value.clone());
let mut nodes = active_set
.get_nodes(
pubkey,
&origin,
|node| value.should_force_push(node),
stakes,
)
.take(self.push_fanout)
.peekable();
let index = values.len();
if nodes.peek().is_some() {
values.push(value.clone())
}
for &node in nodes {
push_messages.entry(node).or_default().push(index);
num_pushes += 1;
if num_pushes >= MAX_NUM_PUSHES {
break 'outer;
Expand All @@ -211,7 +218,7 @@ impl CrdsGossipPush {
drop(crds_cursor);
drop(active_set);
self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed);
(push_messages, num_values, num_pushes)
(values, push_messages, num_pushes)
}

/// Add the `from` to the peer's filter of nodes.
Expand Down Expand Up @@ -297,6 +304,33 @@ mod tests {
)
}

impl CrdsGossipPush {
// Wrapper for CrdsGossipPush.new_push_messages replicating old return
// type for legacy tests.
fn old_new_push_messages(
&self,
pubkey: &Pubkey,
crds: &RwLock<Crds>,
now: u64,
stakes: &HashMap<Pubkey, u64>,
) -> HashMap<Pubkey, Vec<CrdsValue>> {
let (entries, messages, _) = self.new_push_messages(
pubkey,
crds,
now,
stakes,
|_| true, // should_retain_crds_value
);
messages
.into_iter()
.map(|(pubkey, indices)| {
let values = indices.into_iter().map(|k| entries[k].clone()).collect();
(pubkey, values)
})
.collect()
}
}

#[test]
fn test_process_push_one() {
let crds = RwLock::<Crds>::default();
Expand Down Expand Up @@ -422,14 +456,12 @@ mod tests {
[origin].into_iter().collect()
);
assert_eq!(
push.new_push_messages(
push.old_new_push_messages(
&Pubkey::default(),
&crds,
0,
&HashMap::<Pubkey, u64>::default(), // stakes
|_| true, // should_retain_crds_value
)
.0,
),
expected
);
}
Expand Down Expand Up @@ -487,14 +519,12 @@ mod tests {
.into_iter()
.collect();
assert_eq!(
push.new_push_messages(
push.old_new_push_messages(
&Pubkey::default(),
&crds,
now,
&HashMap::<Pubkey, u64>::default(), // stakes
|_| true, // should_retain_crds_value
)
.0,
),
expected
);
}
Expand Down Expand Up @@ -541,14 +571,12 @@ mod tests {
&HashMap::<Pubkey, u64>::default(), // stakes
);
assert_eq!(
push.new_push_messages(
push.old_new_push_messages(
&self_id,
&crds,
0,
&HashMap::<Pubkey, u64>::default(), // stakes
|_| true, // should_retain_crds_value
)
.0,
),
expected
);
}
Expand Down Expand Up @@ -584,14 +612,12 @@ mod tests {
[origin].into_iter().collect()
);
assert_eq!(
push.new_push_messages(
push.old_new_push_messages(
&Pubkey::default(),
&crds,
0,
&HashMap::<Pubkey, u64>::default(), // stakes
|_| true, // should_retain_crds_value
)
.0,
),
expected
);
}
Expand Down
55 changes: 20 additions & 35 deletions gossip/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,47 +208,32 @@ impl Signable for PruneData {
/// max_chunk_size.
/// Note: some messages cannot be contained within that size so in the worst case this returns
/// N nested Vecs with 1 item each.
pub(crate) fn split_gossip_messages<I, T>(
pub(crate) fn split_gossip_messages<T: Serialize + Debug>(
max_chunk_size: usize,
data_feed: I,
) -> impl Iterator<Item = Vec<T>>
where
T: Serialize + Debug,
I: IntoIterator<Item = T>,
{
data_feed: impl IntoIterator<Item = T>,
) -> impl Iterator<Item = Vec<T>> {
let mut data_feed = data_feed.into_iter().fuse();
let mut buffer = vec![];
let mut buffer_size = 0; // Serialized size of buffered values.
std::iter::from_fn(move || loop {
match data_feed.next() {
None => {
return if buffer.is_empty() {
None
} else {
Some(std::mem::take(&mut buffer))
};
}
Some(data) => {
let data_size = match bincode::serialized_size(&data) {
Ok(size) => size as usize,
Err(err) => {
error!("serialized_size failed: {}", err);
continue;
}
};
if buffer_size + data_size <= max_chunk_size {
buffer_size += data_size;
buffer.push(data);
} else if data_size <= max_chunk_size {
buffer_size = data_size;
return Some(std::mem::replace(&mut buffer, vec![data]));
} else {
error!(
"dropping data larger than the maximum chunk size {:?}",
data
);
}
let Some(data) = data_feed.next() else {
return (!buffer.is_empty()).then(|| std::mem::take(&mut buffer));
};
let data_size = match bincode::serialized_size(&data) {
Ok(size) => size as usize,
Err(err) => {
error!("serialized_size failed: {err:?}");
continue;
}
};
if buffer_size + data_size <= max_chunk_size {
buffer_size += data_size;
buffer.push(data);
} else if data_size <= max_chunk_size {
buffer_size = data_size;
return Some(std::mem::replace(&mut buffer, vec![data]));
} else {
error!("dropping data larger than the maximum chunk size {data:?}",);
}
})
}
Expand Down
25 changes: 14 additions & 11 deletions gossip/tests/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,17 +376,20 @@ fn network_run_push(
Duration::from_millis(node.gossip.pull.crds_timeout),
);
node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts);
(
node_pubkey,
node.gossip
.new_push_messages(
&node_pubkey,
now,
&stakes,
|_| true, // should_retain_crds_value
)
.0,
)
let (entries, messages, _) = node.gossip.new_push_messages(
&node_pubkey,
now,
&stakes,
|_| true, // should_retain_crds_value
);
let messages = messages
.into_iter()
.map(|(pubkey, indices)| {
let values = indices.into_iter().map(|k| entries[k].clone()).collect();
(pubkey, values)
})
.collect::<Vec<(_, Vec<_>)>>();
(node_pubkey, messages)
})
.collect();
let transfered: Vec<_> = requests
Expand Down

0 comments on commit e208c63

Please sign in to comment.