diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index f639245d42826f..904426d6209a38 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -1308,7 +1308,7 @@ impl ClusterInfo { } fn new_push_requests(&self, stakes: &HashMap) -> Vec<(SocketAddr, Protocol)> { let self_id = self.id(); - let (push_messages, num_entries, num_nodes) = { + let (entries, push_messages, num_pushes) = { let _st = ScopedTimer::from(&self.stats.new_push_requests); self.flush_push_queue(); self.gossip @@ -1320,10 +1320,10 @@ impl ClusterInfo { }; self.stats .push_fanout_num_entries - .add_relaxed(num_entries as u64); + .add_relaxed(entries.len() as u64); self.stats .push_fanout_num_nodes - .add_relaxed(num_nodes as u64); + .add_relaxed(num_pushes as u64); let push_messages: Vec<_> = { let gossip_crds = self.time_gossip_read_lock("push_req_lookup", &self.stats.new_push_requests2); @@ -1338,6 +1338,7 @@ impl ClusterInfo { let messages: Vec<_> = push_messages .into_iter() .flat_map(|(peer, msgs)| { + let msgs = msgs.into_iter().map(|k| entries[k].clone()); split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, msgs) .map(move |payload| (peer, Protocol::PushMessage(self_id, payload))) }) @@ -3474,12 +3475,19 @@ mod tests { ); //check that all types of gossip messages are signed correctly cluster_info.flush_push_queue(); - let (push_messages, _, _) = cluster_info.gossip.new_push_messages( + let (entries, push_messages, _) = cluster_info.gossip.new_push_messages( &cluster_info.id(), timestamp(), &stakes, |_| true, // should_retain_crds_value ); + let push_messages = push_messages + .into_iter() + .map(|(pubkey, indices)| { + let values = indices.into_iter().map(|k| entries[k].clone()).collect(); + (pubkey, values) + }) + .collect::>>(); // there should be some pushes ready assert!(!push_messages.is_empty()); push_messages diff --git a/gossip/src/crds_gossip.rs b/gossip/src/crds_gossip.rs index e780c74c5d81dc..da4db03aea3d95 100644 --- a/gossip/src/crds_gossip.rs +++ b/gossip/src/crds_gossip.rs @@ -76,8 +76,9 @@ impl CrdsGossip { stakes: &HashMap, should_retain_crds_value: impl Fn(&CrdsValue) -> bool, ) -> ( - HashMap>, - usize, // number of values + Vec, // unique CrdsValues pushed out to peers + // map of pubkeys to indices in Vec pushed to that peer + HashMap>, usize, // number of push messages ) { self.push diff --git a/gossip/src/crds_gossip_push.rs b/gossip/src/crds_gossip_push.rs index da2e3753195e1b..32808d68453ba3 100644 --- a/gossip/src/crds_gossip_push.rs +++ b/gossip/src/crds_gossip_push.rs @@ -172,16 +172,17 @@ impl CrdsGossipPush { // Predicate returning false if the CRDS value should be discarded. should_retain_crds_value: impl Fn(&CrdsValue) -> bool, ) -> ( - HashMap>, - usize, // number of values + Vec, // unique CrdsValues pushed out to peers + // map of pubkeys to indices in Vec pushed to that peer + HashMap>, usize, // number of push messages ) { const MAX_NUM_PUSHES: usize = 1 << 12; - let active_set = self.active_set.read().unwrap(); let mut num_pushes = 0; - let mut num_values = 0; - let mut push_messages: HashMap> = HashMap::new(); + let mut values = Vec::new(); + let mut push_messages = HashMap::>::new(); let wallclock_window = self.wallclock_window(now); + let active_set = self.active_set.read().unwrap(); let mut crds_cursor = self.crds_cursor.lock().unwrap(); // crds should be locked last after self.{active_set,crds_cursor}. let crds = crds.read().unwrap(); @@ -191,16 +192,22 @@ impl CrdsGossipPush { .filter(|value| wallclock_window.contains(&value.wallclock())) .filter(|value| should_retain_crds_value(value)); 'outer: for value in entries { - num_values += 1; let origin = value.pubkey(); - let nodes = active_set.get_nodes( - pubkey, - &origin, - |node| value.should_force_push(node), - stakes, - ); - for node in nodes.take(self.push_fanout) { - push_messages.entry(*node).or_default().push(value.clone()); + let mut nodes = active_set + .get_nodes( + pubkey, + &origin, + |node| value.should_force_push(node), + stakes, + ) + .take(self.push_fanout) + .peekable(); + let index = values.len(); + if nodes.peek().is_some() { + values.push(value.clone()) + } + for &node in nodes { + push_messages.entry(node).or_default().push(index); num_pushes += 1; if num_pushes >= MAX_NUM_PUSHES { break 'outer; @@ -211,7 +218,7 @@ impl CrdsGossipPush { drop(crds_cursor); drop(active_set); self.num_pushes.fetch_add(num_pushes, Ordering::Relaxed); - (push_messages, num_values, num_pushes) + (values, push_messages, num_pushes) } /// Add the `from` to the peer's filter of nodes. @@ -297,6 +304,33 @@ mod tests { ) } + impl CrdsGossipPush { + // Wrapper for CrdsGossipPush.new_push_messages replicating old return + // type for legacy tests. + fn old_new_push_messages( + &self, + pubkey: &Pubkey, + crds: &RwLock, + now: u64, + stakes: &HashMap, + ) -> HashMap> { + let (entries, messages, _) = self.new_push_messages( + pubkey, + crds, + now, + stakes, + |_| true, // should_retain_crds_value + ); + messages + .into_iter() + .map(|(pubkey, indices)| { + let values = indices.into_iter().map(|k| entries[k].clone()).collect(); + (pubkey, values) + }) + .collect() + } + } + #[test] fn test_process_push_one() { let crds = RwLock::::default(); @@ -422,14 +456,12 @@ mod tests { [origin].into_iter().collect() ); assert_eq!( - push.new_push_messages( + push.old_new_push_messages( &Pubkey::default(), &crds, 0, &HashMap::::default(), // stakes - |_| true, // should_retain_crds_value - ) - .0, + ), expected ); } @@ -487,14 +519,12 @@ mod tests { .into_iter() .collect(); assert_eq!( - push.new_push_messages( + push.old_new_push_messages( &Pubkey::default(), &crds, now, &HashMap::::default(), // stakes - |_| true, // should_retain_crds_value - ) - .0, + ), expected ); } @@ -541,14 +571,12 @@ mod tests { &HashMap::::default(), // stakes ); assert_eq!( - push.new_push_messages( + push.old_new_push_messages( &self_id, &crds, 0, &HashMap::::default(), // stakes - |_| true, // should_retain_crds_value - ) - .0, + ), expected ); } @@ -584,14 +612,12 @@ mod tests { [origin].into_iter().collect() ); assert_eq!( - push.new_push_messages( + push.old_new_push_messages( &Pubkey::default(), &crds, 0, &HashMap::::default(), // stakes - |_| true, // should_retain_crds_value - ) - .0, + ), expected ); } diff --git a/gossip/src/protocol.rs b/gossip/src/protocol.rs index 3274dad76f62ce..4c9beb3b48f44e 100644 --- a/gossip/src/protocol.rs +++ b/gossip/src/protocol.rs @@ -208,47 +208,32 @@ impl Signable for PruneData { /// max_chunk_size. /// Note: some messages cannot be contained within that size so in the worst case this returns /// N nested Vecs with 1 item each. -pub(crate) fn split_gossip_messages( +pub(crate) fn split_gossip_messages( max_chunk_size: usize, - data_feed: I, -) -> impl Iterator> -where - T: Serialize + Debug, - I: IntoIterator, -{ + data_feed: impl IntoIterator, +) -> impl Iterator> { let mut data_feed = data_feed.into_iter().fuse(); let mut buffer = vec![]; let mut buffer_size = 0; // Serialized size of buffered values. std::iter::from_fn(move || loop { - match data_feed.next() { - None => { - return if buffer.is_empty() { - None - } else { - Some(std::mem::take(&mut buffer)) - }; - } - Some(data) => { - let data_size = match bincode::serialized_size(&data) { - Ok(size) => size as usize, - Err(err) => { - error!("serialized_size failed: {}", err); - continue; - } - }; - if buffer_size + data_size <= max_chunk_size { - buffer_size += data_size; - buffer.push(data); - } else if data_size <= max_chunk_size { - buffer_size = data_size; - return Some(std::mem::replace(&mut buffer, vec![data])); - } else { - error!( - "dropping data larger than the maximum chunk size {:?}", - data - ); - } + let Some(data) = data_feed.next() else { + return (!buffer.is_empty()).then(|| std::mem::take(&mut buffer)); + }; + let data_size = match bincode::serialized_size(&data) { + Ok(size) => size as usize, + Err(err) => { + error!("serialized_size failed: {err:?}"); + continue; } + }; + if buffer_size + data_size <= max_chunk_size { + buffer_size += data_size; + buffer.push(data); + } else if data_size <= max_chunk_size { + buffer_size = data_size; + return Some(std::mem::replace(&mut buffer, vec![data])); + } else { + error!("dropping data larger than the maximum chunk size {data:?}",); } }) } diff --git a/gossip/tests/crds_gossip.rs b/gossip/tests/crds_gossip.rs index b230f31ea43407..028f4bb2a494df 100644 --- a/gossip/tests/crds_gossip.rs +++ b/gossip/tests/crds_gossip.rs @@ -376,17 +376,20 @@ fn network_run_push( Duration::from_millis(node.gossip.pull.crds_timeout), ); node.gossip.purge(&node_pubkey, thread_pool, now, &timeouts); - ( - node_pubkey, - node.gossip - .new_push_messages( - &node_pubkey, - now, - &stakes, - |_| true, // should_retain_crds_value - ) - .0, - ) + let (entries, messages, _) = node.gossip.new_push_messages( + &node_pubkey, + now, + &stakes, + |_| true, // should_retain_crds_value + ); + let messages = messages + .into_iter() + .map(|(pubkey, indices)| { + let values = indices.into_iter().map(|k| entries[k].clone()).collect(); + (pubkey, values) + }) + .collect::)>>(); + (node_pubkey, messages) }) .collect(); let transfered: Vec<_> = requests