From 6dec2426bdcba40568a26cdc0d2a51e218e7df7a Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Tue, 18 Jun 2024 20:46:17 +0200 Subject: [PATCH] fix(network): fix panic in overlays on retain --- network/src/lib.rs | 8 +- network/src/overlay/mod.rs | 8 +- network/src/overlay/private_overlay.rs | 118 +++++++++++++------------ network/src/overlay/public_overlay.rs | 97 +++++++++++--------- 4 files changed, 127 insertions(+), 104 deletions(-) diff --git a/network/src/lib.rs b/network/src/lib.rs index 7937089be..3d28e6e8a 100644 --- a/network/src/lib.rs +++ b/network/src/lib.rs @@ -18,10 +18,12 @@ pub use types::{ }; pub use self::overlay::{ - OverlayConfig, OverlayId, OverlayService, OverlayServiceBackgroundTasks, OverlayServiceBuilder, + ChooseMultiplePrivateOverlayEntries, ChooseMultiplePublicOverlayEntries, OverlayConfig, + OverlayId, OverlayService, OverlayServiceBackgroundTasks, OverlayServiceBuilder, PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries, PrivateOverlayEntriesEvent, - PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard, PublicOverlay, - PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard, + PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard, PrivateOverlayEntryData, + PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard, + PublicOverlayEntryData, }; pub use self::util::{check_peer_signature, NetworkExt, Routable, Router, RouterBuilder}; diff --git a/network/src/overlay/mod.rs b/network/src/overlay/mod.rs index 0a761eba7..4f3139b36 100644 --- a/network/src/overlay/mod.rs +++ b/network/src/overlay/mod.rs @@ -11,11 +11,13 @@ pub use self::config::OverlayConfig; use self::entries_merger::PublicOverlayEntriesMerger; pub use self::overlay_id::OverlayId; pub use self::private_overlay::{ - PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries, PrivateOverlayEntriesEvent, - PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard, + ChooseMultiplePrivateOverlayEntries, PrivateOverlay, PrivateOverlayBuilder, + PrivateOverlayEntries, PrivateOverlayEntriesEvent, PrivateOverlayEntriesReadGuard, + PrivateOverlayEntriesWriteGuard, PrivateOverlayEntryData, }; pub use self::public_overlay::{ - PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard, + ChooseMultiplePublicOverlayEntries, PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, + PublicOverlayEntriesReadGuard, PublicOverlayEntryData, }; use crate::dht::DhtService; use crate::network::Network; diff --git a/network/src/overlay/private_overlay.rs b/network/src/overlay/private_overlay.rs index 5b9a6d56f..dfcdf2f6a 100644 --- a/network/src/overlay/private_overlay.rs +++ b/network/src/overlay/private_overlay.rs @@ -1,15 +1,14 @@ use std::borrow::Borrow; -use std::collections::hash_map; use std::sync::Arc; use anyhow::Result; use bytes::{Bytes, BytesMut}; +use indexmap::IndexMap; use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use rand::seq::SliceRandom; use rand::Rng; use tokio::sync::broadcast; use tycho_util::futures::BoxFutureOrNoop; -use tycho_util::{FastHashMap, FastHashSet}; +use tycho_util::{FastHashSet, FastHasherState}; use crate::dht::{PeerResolver, PeerResolverHandle}; use crate::network::Network; @@ -62,8 +61,7 @@ impl PrivateOverlayBuilder { }); let mut entries = PrivateOverlayEntries { - peer_id_to_index: Default::default(), - data: Default::default(), + items: Default::default(), events_tx: broadcast::channel(self.entry_events_channel_size).0, peer_resolver: self.peer_resolver, }; @@ -177,8 +175,7 @@ struct Inner { // NOTE: `#[derive(Default)]` is missing to prevent construction outside the // crate. pub struct PrivateOverlayEntries { - peer_id_to_index: FastHashMap, - data: Vec, + items: OverlayItems, events_tx: broadcast::Sender, peer_resolver: Option, } @@ -192,8 +189,8 @@ impl PrivateOverlayEntries { /// Returns an iterator over the entry ids. /// /// The order is not random, but is not defined. - pub fn iter(&self) -> std::slice::Iter<'_, PrivateOverlayEntryData> { - self.data.iter() + pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PrivateOverlayEntryData> { + self.items.values() } /// Returns one random peer, or `None` if set is empty. @@ -201,7 +198,9 @@ impl PrivateOverlayEntries { where R: Rng + ?Sized, { - self.data.choose(rng) + let index = rng.gen_range(0..self.items.len()); + let (_, value) = self.items.get_index(index)?; + Some(value) } /// Chooses `n` entries from the set, without repetition, @@ -210,56 +209,55 @@ impl PrivateOverlayEntries { &self, rng: &mut R, n: usize, - ) -> rand::seq::SliceChooseIter<'_, [PrivateOverlayEntryData], PrivateOverlayEntryData> + ) -> ChooseMultiplePrivateOverlayEntries<'_> where R: Rng + ?Sized, { - self.data.choose_multiple(rng, n) + let len = self.items.len(); + ChooseMultiplePrivateOverlayEntries { + items: &self.items, + indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(), + } } /// Clears the set, removing all entries. pub fn clear(&mut self) { - self.peer_id_to_index.clear(); - self.data.clear(); + self.items.clear(); } /// Returns `true` if the set contains no elements. pub fn is_empty(&self) -> bool { - self.data.is_empty() + self.items.is_empty() } /// Returns the number of elements in the set, also referred to as its 'length'. pub fn len(&self) -> usize { - self.data.len() + self.items.len() } /// Returns true if the set contains the specified peer id. pub fn contains(&self, peer_id: &PeerId) -> bool { - self.peer_id_to_index.contains_key(peer_id) + self.items.contains_key(peer_id) } /// Returns the peer resolver handle for the specified peer id, if it exists. pub fn get_handle(&self, peer_id: &PeerId) -> Option<&PeerResolverHandle> { - self.peer_id_to_index - .get(peer_id) - .map(|&index| &self.data[index].resolver_handle) + self.items.get(peer_id).map(|item| &item.resolver_handle) } /// Adds a peer id to the set. /// /// Returns whether the value was newly inserted. pub fn insert(&mut self, peer_id: &PeerId) -> bool { - match self.peer_id_to_index.entry(*peer_id) { + match self.items.entry(*peer_id) { // No entry for the peer_id, insert a new one - hash_map::Entry::Vacant(entry) => { - entry.insert(self.data.len()); - + indexmap::map::Entry::Vacant(entry) => { let handle = self.peer_resolver.as_ref().map_or_else( || PeerResolverHandle::new_noop(peer_id), |resolver| resolver.insert(peer_id, true), ); - self.data.push(PrivateOverlayEntryData { + entry.insert(PrivateOverlayEntryData { peer_id: *peer_id, resolver_handle: handle, }); @@ -270,7 +268,7 @@ impl PrivateOverlayEntries { true } // Entry for the peer_id exists, do nothing - hash_map::Entry::Occupied(_) => false, + indexmap::map::Entry::Occupied(_) => false, } } @@ -278,29 +276,13 @@ impl PrivateOverlayEntries { /// /// Returns whether the value was present in the set. pub fn remove(&mut self, peer_id: &PeerId) -> bool { - let Some(link) = self.peer_id_to_index.remove(peer_id) else { - return false; - }; - - // Remove the entry from the data vector - self.data.swap_remove(link); - self.fix_data_index(link); - - _ = self - .events_tx - .send(PrivateOverlayEntriesEvent::Removed(*peer_id)); - - true - } - - fn fix_data_index(&mut self, index: usize) { - if index < self.data.len() { - let link = self - .peer_id_to_index - .get_mut(&self.data[index].peer_id) - .expect("inconsistent data state"); - *link = index; + let removed = self.items.swap_remove(peer_id).is_some(); + if removed { + _ = self + .events_tx + .send(PrivateOverlayEntriesEvent::Removed(*peer_id)); } + removed } } @@ -358,6 +340,34 @@ pub enum PrivateOverlayEntriesEvent { Removed(PeerId), } +pub struct ChooseMultiplePrivateOverlayEntries<'a> { + items: &'a OverlayItems, + indices: rand::seq::index::IndexVecIntoIter, +} + +impl<'a> Iterator for ChooseMultiplePrivateOverlayEntries<'a> { + type Item = &'a PrivateOverlayEntryData; + + fn next(&mut self) -> Option { + self.indices.next().and_then(|i| { + let (_, value) = self.items.get_index(i)?; + Some(value) + }) + } + + fn size_hint(&self) -> (usize, Option) { + (self.indices.len(), Some(self.indices.len())) + } +} + +impl ExactSizeIterator for ChooseMultiplePrivateOverlayEntries<'_> { + fn len(&self) -> usize { + self.indices.len() + } +} + +type OverlayItems = IndexMap; + #[cfg(test)] mod tests { use super::*; @@ -365,8 +375,7 @@ mod tests { #[test] fn entries_container_is_set() { let mut entries = PrivateOverlayEntries { - peer_id_to_index: Default::default(), - data: Default::default(), + items: Default::default(), peer_resolver: None, events_tx: broadcast::channel(100).0, }; @@ -392,8 +401,7 @@ mod tests { let (events_tx, mut events_rx) = broadcast::channel(100); let mut entries = PrivateOverlayEntries { - peer_id_to_index: Default::default(), - data: Default::default(), + items: Default::default(), peer_resolver: None, events_tx, }; @@ -402,7 +410,6 @@ mod tests { for (i, peer_id) in peer_ids.iter().enumerate() { assert!(entries.insert(peer_id)); assert_eq!(entries.len(), i + 1); - assert_eq!(entries.data.len(), i + 1); assert_eq!( events_rx.try_recv().unwrap(), PrivateOverlayEntriesEvent::Added(*peer_id) @@ -416,10 +423,7 @@ mod tests { PrivateOverlayEntriesEvent::Removed(*peer_id) ); - assert!(entries.data.iter().all(|entry| entry.peer_id != peer_id)); - for (index, entry) in entries.data.iter().enumerate() { - assert_eq!(entries.peer_id_to_index[&entry.peer_id], index); - } + assert!(!entries.items.contains_key(peer_id)); } assert!(entries.is_empty()); diff --git a/network/src/overlay/public_overlay.rs b/network/src/overlay/public_overlay.rs index ce08cfea1..1b7192365 100644 --- a/network/src/overlay/public_overlay.rs +++ b/network/src/overlay/public_overlay.rs @@ -1,17 +1,16 @@ use std::borrow::Borrow; -use std::collections::hash_map; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use bytes::{Bytes, BytesMut}; +use indexmap::IndexMap; use parking_lot::{RwLock, RwLockReadGuard}; -use rand::seq::SliceRandom; use rand::Rng; use tokio::sync::Notify; use tycho_util::futures::BoxFutureOrNoop; -use tycho_util::{FastDashSet, FastHashMap}; +use tycho_util::{FastDashSet, FastHasherState}; use crate::dht::{PeerResolver, PeerResolverHandle}; use crate::network::Network; @@ -76,8 +75,7 @@ impl PublicOverlayBuilder { }); let entries = PublicOverlayEntries { - peer_id_to_index: Default::default(), - data: Default::default(), + items: Default::default(), peer_resolver: self.peer_resolver, }; @@ -365,32 +363,31 @@ struct Inner { } pub struct PublicOverlayEntries { - peer_id_to_index: FastHashMap, - data: Vec, + items: OverlayItems, peer_resolver: Option, } impl PublicOverlayEntries { /// Returns `true` if the set contains no elements. pub fn is_empty(&self) -> bool { - self.data.is_empty() + self.items.is_empty() } /// Returns the number of elements in the set, also referred to as its 'length'. pub fn len(&self) -> usize { - self.data.len() + self.items.len() } /// Returns true if the set contains the specified peer id. pub fn contains(&self, peer_id: &PeerId) -> bool { - self.peer_id_to_index.contains_key(peer_id) + self.items.contains_key(peer_id) } /// Returns an iterator over the entries. /// /// The order is not random, but is not defined. - pub fn iter(&self) -> std::slice::Iter<'_, PublicOverlayEntryData> { - self.data.iter() + pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PublicOverlayEntryData> { + self.items.values() } /// Returns a reference to one random element of the slice, @@ -399,7 +396,9 @@ impl PublicOverlayEntries { where R: Rng + ?Sized, { - self.data.choose(rng) + let index = rng.gen_range(0..self.items.len()); + let (_, value) = self.items.get_index(index)?; + Some(value) } /// Chooses `n` entries from the set, without repetition, @@ -408,37 +407,36 @@ impl PublicOverlayEntries { &self, rng: &mut R, n: usize, - ) -> rand::seq::SliceChooseIter<'_, [PublicOverlayEntryData], PublicOverlayEntryData> + ) -> ChooseMultiplePublicOverlayEntries<'_> where R: Rng + ?Sized, { - self.data.choose_multiple(rng, n) + let len = self.items.len(); + ChooseMultiplePublicOverlayEntries { + items: &self.items, + indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(), + } } /// Chooses all entries from the set, without repetition, /// and in random order. - pub fn choose_all( - &self, - rng: &mut R, - ) -> rand::seq::SliceChooseIter<'_, [PublicOverlayEntryData], PublicOverlayEntryData> + pub fn choose_all(&self, rng: &mut R) -> ChooseMultiplePublicOverlayEntries<'_> where R: Rng + ?Sized, { - self.data.choose_multiple(rng, self.data.len()) + self.choose_multiple(rng, self.items.len()) } fn insert(&mut self, item: &PublicEntry) -> UpdateStatus { - match self.peer_id_to_index.entry(item.peer_id) { + match self.items.entry(item.peer_id) { // No entry for the peer_id, insert a new one - hash_map::Entry::Vacant(entry) => { - entry.insert(self.data.len()); - + indexmap::map::Entry::Vacant(entry) => { let resolver_handle = self.peer_resolver.as_ref().map_or_else( || PeerResolverHandle::new_noop(&item.peer_id), |resolver| resolver.insert(&item.peer_id, false), ); - self.data.push(PublicOverlayEntryData { + entry.insert(PublicOverlayEntryData { entry: Arc::new(item.clone()), resolver_handle, }); @@ -446,9 +444,8 @@ impl PublicOverlayEntries { UpdateStatus::Added } // Entry for the peer_id exists, update it if the new item is newer - hash_map::Entry::Occupied(entry) => { - let index = *entry.get(); - let existing = &mut self.data[index]; + indexmap::map::Entry::Occupied(mut entry) => { + let existing = entry.get_mut(); if existing.entry.created_at >= item.created_at { return UpdateStatus::Skipped; } @@ -456,7 +453,7 @@ impl PublicOverlayEntries { // Try to reuse the existing Arc if possible match Arc::get_mut(&mut existing.entry) { Some(existing) => existing.clone_from(item), - None => self.data[index].entry = Arc::new(item.clone()), + None => existing.entry = Arc::new(item.clone()), } UpdateStatus::Updated } @@ -467,13 +464,7 @@ impl PublicOverlayEntries { where F: FnMut(&PublicOverlayEntryData) -> bool, { - self.data.retain(|item| { - let keep = f(item); - if !keep { - self.peer_id_to_index.remove(&item.entry.peer_id); - } - keep - }); + self.items.retain(|_, item| f(item)); } } @@ -523,6 +514,34 @@ impl UpdateStatus { } } +pub struct ChooseMultiplePublicOverlayEntries<'a> { + items: &'a OverlayItems, + indices: rand::seq::index::IndexVecIntoIter, +} + +impl<'a> Iterator for ChooseMultiplePublicOverlayEntries<'a> { + type Item = &'a PublicOverlayEntryData; + + fn next(&mut self) -> Option { + self.indices.next().and_then(|i| { + let (_, value) = self.items.get_index(i)?; + Some(value) + }) + } + + fn size_hint(&self) -> (usize, Option) { + (self.indices.len(), Some(self.indices.len())) + } +} + +impl ExactSizeIterator for ChooseMultiplePublicOverlayEntries<'_> { + fn len(&self) -> usize { + self.indices.len() + } +} + +type OverlayItems = IndexMap; + #[cfg(test)] mod tests { use everscale_crypto::ed25519; @@ -568,11 +587,7 @@ mod tests { fn count_entries(overlay: &PublicOverlay) -> usize { let tracked_count = overlay.inner.entry_count.load(Ordering::Acquire); let guard = overlay.read_entries(); - assert_eq!( - guard.entries.data.len(), - guard.entries.peer_id_to_index.len(), - ); - assert_eq!(guard.entries.data.len(), tracked_count); + assert_eq!(guard.entries.items.len(), tracked_count); tracked_count }