Skip to content

Commit

Permalink
fix(network): fix panic in overlays on retain
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 18, 2024
1 parent a0726c5 commit 6dec242
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 104 deletions.
8 changes: 5 additions & 3 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ pub use types::{
};

pub use self::overlay::{
OverlayConfig, OverlayId, OverlayService, OverlayServiceBackgroundTasks, OverlayServiceBuilder,
ChooseMultiplePrivateOverlayEntries, ChooseMultiplePublicOverlayEntries, OverlayConfig,
OverlayId, OverlayService, OverlayServiceBackgroundTasks, OverlayServiceBuilder,
PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries, PrivateOverlayEntriesEvent,
PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard, PublicOverlay,
PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard,
PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard, PrivateOverlayEntryData,
PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard,
PublicOverlayEntryData,
};
pub use self::util::{check_peer_signature, NetworkExt, Routable, Router, RouterBuilder};

Expand Down
8 changes: 5 additions & 3 deletions network/src/overlay/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ pub use self::config::OverlayConfig;
use self::entries_merger::PublicOverlayEntriesMerger;
pub use self::overlay_id::OverlayId;
pub use self::private_overlay::{
PrivateOverlay, PrivateOverlayBuilder, PrivateOverlayEntries, PrivateOverlayEntriesEvent,
PrivateOverlayEntriesReadGuard, PrivateOverlayEntriesWriteGuard,
ChooseMultiplePrivateOverlayEntries, PrivateOverlay, PrivateOverlayBuilder,
PrivateOverlayEntries, PrivateOverlayEntriesEvent, PrivateOverlayEntriesReadGuard,
PrivateOverlayEntriesWriteGuard, PrivateOverlayEntryData,
};
pub use self::public_overlay::{
PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries, PublicOverlayEntriesReadGuard,
ChooseMultiplePublicOverlayEntries, PublicOverlay, PublicOverlayBuilder, PublicOverlayEntries,
PublicOverlayEntriesReadGuard, PublicOverlayEntryData,
};
use crate::dht::DhtService;
use crate::network::Network;
Expand Down
118 changes: 61 additions & 57 deletions network/src/overlay/private_overlay.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
use std::borrow::Borrow;
use std::collections::hash_map;
use std::sync::Arc;

use anyhow::Result;
use bytes::{Bytes, BytesMut};
use indexmap::IndexMap;
use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use rand::seq::SliceRandom;
use rand::Rng;
use tokio::sync::broadcast;
use tycho_util::futures::BoxFutureOrNoop;
use tycho_util::{FastHashMap, FastHashSet};
use tycho_util::{FastHashSet, FastHasherState};

use crate::dht::{PeerResolver, PeerResolverHandle};
use crate::network::Network;
Expand Down Expand Up @@ -62,8 +61,7 @@ impl PrivateOverlayBuilder {
});

let mut entries = PrivateOverlayEntries {
peer_id_to_index: Default::default(),
data: Default::default(),
items: Default::default(),
events_tx: broadcast::channel(self.entry_events_channel_size).0,
peer_resolver: self.peer_resolver,
};
Expand Down Expand Up @@ -177,8 +175,7 @@ struct Inner {
// NOTE: `#[derive(Default)]` is missing to prevent construction outside the
// crate.
pub struct PrivateOverlayEntries {
peer_id_to_index: FastHashMap<PeerId, usize>,
data: Vec<PrivateOverlayEntryData>,
items: OverlayItems,
events_tx: broadcast::Sender<PrivateOverlayEntriesEvent>,
peer_resolver: Option<PeerResolver>,
}
Expand All @@ -192,16 +189,18 @@ impl PrivateOverlayEntries {
/// Returns an iterator over the entry ids.
///
/// The order is not random, but is not defined.
pub fn iter(&self) -> std::slice::Iter<'_, PrivateOverlayEntryData> {
self.data.iter()
pub fn iter(&self) -> indexmap::map::Values<'_, PeerId, PrivateOverlayEntryData> {
self.items.values()
}

/// Returns one random peer, or `None` if set is empty.
pub fn choose<R>(&self, rng: &mut R) -> Option<&PrivateOverlayEntryData>
where
R: Rng + ?Sized,
{
self.data.choose(rng)
let index = rng.gen_range(0..self.items.len());
let (_, value) = self.items.get_index(index)?;
Some(value)
}

/// Chooses `n` entries from the set, without repetition,
Expand All @@ -210,56 +209,55 @@ impl PrivateOverlayEntries {
&self,
rng: &mut R,
n: usize,
) -> rand::seq::SliceChooseIter<'_, [PrivateOverlayEntryData], PrivateOverlayEntryData>
) -> ChooseMultiplePrivateOverlayEntries<'_>
where
R: Rng + ?Sized,
{
self.data.choose_multiple(rng, n)
let len = self.items.len();
ChooseMultiplePrivateOverlayEntries {
items: &self.items,
indices: rand::seq::index::sample(rng, len, n.min(len)).into_iter(),
}
}

/// Clears the set, removing all entries.
pub fn clear(&mut self) {
self.peer_id_to_index.clear();
self.data.clear();
self.items.clear();
}

/// Returns `true` if the set contains no elements.
pub fn is_empty(&self) -> bool {
self.data.is_empty()
self.items.is_empty()
}

/// Returns the number of elements in the set, also referred to as its 'length'.
pub fn len(&self) -> usize {
self.data.len()
self.items.len()
}

/// Returns true if the set contains the specified peer id.
pub fn contains(&self, peer_id: &PeerId) -> bool {
self.peer_id_to_index.contains_key(peer_id)
self.items.contains_key(peer_id)
}

/// Returns the peer resolver handle for the specified peer id, if it exists.
pub fn get_handle(&self, peer_id: &PeerId) -> Option<&PeerResolverHandle> {
self.peer_id_to_index
.get(peer_id)
.map(|&index| &self.data[index].resolver_handle)
self.items.get(peer_id).map(|item| &item.resolver_handle)
}

/// Adds a peer id to the set.
///
/// Returns whether the value was newly inserted.
pub fn insert(&mut self, peer_id: &PeerId) -> bool {
match self.peer_id_to_index.entry(*peer_id) {
match self.items.entry(*peer_id) {
// No entry for the peer_id, insert a new one
hash_map::Entry::Vacant(entry) => {
entry.insert(self.data.len());

indexmap::map::Entry::Vacant(entry) => {
let handle = self.peer_resolver.as_ref().map_or_else(
|| PeerResolverHandle::new_noop(peer_id),
|resolver| resolver.insert(peer_id, true),
);

self.data.push(PrivateOverlayEntryData {
entry.insert(PrivateOverlayEntryData {
peer_id: *peer_id,
resolver_handle: handle,
});
Expand All @@ -270,37 +268,21 @@ impl PrivateOverlayEntries {
true
}
// Entry for the peer_id exists, do nothing
hash_map::Entry::Occupied(_) => false,
indexmap::map::Entry::Occupied(_) => false,
}
}

/// Removes a value from the set.
///
/// Returns whether the value was present in the set.
pub fn remove(&mut self, peer_id: &PeerId) -> bool {
let Some(link) = self.peer_id_to_index.remove(peer_id) else {
return false;
};

// Remove the entry from the data vector
self.data.swap_remove(link);
self.fix_data_index(link);

_ = self
.events_tx
.send(PrivateOverlayEntriesEvent::Removed(*peer_id));

true
}

fn fix_data_index(&mut self, index: usize) {
if index < self.data.len() {
let link = self
.peer_id_to_index
.get_mut(&self.data[index].peer_id)
.expect("inconsistent data state");
*link = index;
let removed = self.items.swap_remove(peer_id).is_some();
if removed {
_ = self
.events_tx
.send(PrivateOverlayEntriesEvent::Removed(*peer_id));
}
removed
}
}

Expand Down Expand Up @@ -358,15 +340,42 @@ pub enum PrivateOverlayEntriesEvent {
Removed(PeerId),
}

pub struct ChooseMultiplePrivateOverlayEntries<'a> {
items: &'a OverlayItems,
indices: rand::seq::index::IndexVecIntoIter,
}

impl<'a> Iterator for ChooseMultiplePrivateOverlayEntries<'a> {
type Item = &'a PrivateOverlayEntryData;

fn next(&mut self) -> Option<Self::Item> {
self.indices.next().and_then(|i| {
let (_, value) = self.items.get_index(i)?;
Some(value)
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
(self.indices.len(), Some(self.indices.len()))
}
}

impl ExactSizeIterator for ChooseMultiplePrivateOverlayEntries<'_> {
fn len(&self) -> usize {
self.indices.len()
}
}

type OverlayItems = IndexMap<PeerId, PrivateOverlayEntryData, FastHasherState>;

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn entries_container_is_set() {
let mut entries = PrivateOverlayEntries {
peer_id_to_index: Default::default(),
data: Default::default(),
items: Default::default(),
peer_resolver: None,
events_tx: broadcast::channel(100).0,
};
Expand All @@ -392,8 +401,7 @@ mod tests {
let (events_tx, mut events_rx) = broadcast::channel(100);

let mut entries = PrivateOverlayEntries {
peer_id_to_index: Default::default(),
data: Default::default(),
items: Default::default(),
peer_resolver: None,
events_tx,
};
Expand All @@ -402,7 +410,6 @@ mod tests {
for (i, peer_id) in peer_ids.iter().enumerate() {
assert!(entries.insert(peer_id));
assert_eq!(entries.len(), i + 1);
assert_eq!(entries.data.len(), i + 1);
assert_eq!(
events_rx.try_recv().unwrap(),
PrivateOverlayEntriesEvent::Added(*peer_id)
Expand All @@ -416,10 +423,7 @@ mod tests {
PrivateOverlayEntriesEvent::Removed(*peer_id)
);

assert!(entries.data.iter().all(|entry| entry.peer_id != peer_id));
for (index, entry) in entries.data.iter().enumerate() {
assert_eq!(entries.peer_id_to_index[&entry.peer_id], index);
}
assert!(!entries.items.contains_key(peer_id));
}

assert!(entries.is_empty());
Expand Down
Loading

0 comments on commit 6dec242

Please sign in to comment.