From 8b5a90a5a3015418a3c6083dfec604664f151cd6 Mon Sep 17 00:00:00 2001 From: Mark Rousskov Date: Thu, 6 Feb 2025 21:28:29 +0000 Subject: [PATCH] Replace fixed maps with hashbrown::HashTable We still preserve the fixed-size nature, albeit at (potentially) greater memory cost, since hashbrown will resize up to 2x the actual max entry count due to having tombstones in the table. After that final growth we will incur somewhat expensive table-wide rehashing periodically, but this is sufficiently fast to be acceptable in practice. --- dc/s2n-quic-dc/Cargo.toml | 1 + dc/s2n-quic-dc/src/credentials.rs | 12 +- dc/s2n-quic-dc/src/fixed_map.rs | 228 ---------------- dc/s2n-quic-dc/src/lib.rs | 1 - dc/s2n-quic-dc/src/path/secret/map/cleaner.rs | 4 +- dc/s2n-quic-dc/src/path/secret/map/state.rs | 247 ++++++++++++------ dc/s2n-quic-dc/src/path/secret/map/store.rs | 7 +- 7 files changed, 187 insertions(+), 313 deletions(-) delete mode 100644 dc/s2n-quic-dc/src/fixed_map.rs diff --git a/dc/s2n-quic-dc/Cargo.toml b/dc/s2n-quic-dc/Cargo.toml index a84da682c..1f592ad00 100644 --- a/dc/s2n-quic-dc/Cargo.toml +++ b/dc/s2n-quic-dc/Cargo.toml @@ -37,6 +37,7 @@ s2n-codec = { version = "=0.53.0", path = "../../common/s2n-codec", default-feat s2n-quic-core = { version = "=0.53.0", path = "../../quic/s2n-quic-core", default-features = false } s2n-quic-platform = { version = "=0.53.0", path = "../../quic/s2n-quic-platform" } slotmap = "1" +hashbrown = "0.15" thiserror = "2" tokio = { version = "1", default-features = false, features = ["sync"] } tracing = "0.1" diff --git a/dc/s2n-quic-dc/src/credentials.rs b/dc/s2n-quic-dc/src/credentials.rs index 57ac71223..0f362ad86 100644 --- a/dc/s2n-quic-dc/src/credentials.rs +++ b/dc/s2n-quic-dc/src/credentials.rs @@ -25,12 +25,18 @@ pub mod testing; #[repr(C)] pub struct Id([u8; 16]); -impl std::hash::Hash for Id { - fn hash(&self, state: &mut H) { +impl Id { + pub(crate) fn to_hash(self) -> u64 { // The ID has very high quality entropy already, so write just one half of it to keep hash // costs as low as possible. For the main use of the Hash impl in the fixed-size ID map // this translates to just directly using these bytes for the indexing. - state.write_u64(u64::from_ne_bytes(self.0[..8].try_into().unwrap())); + u64::from_ne_bytes(self.0[..8].try_into().unwrap()) + } +} + +impl std::hash::Hash for Id { + fn hash(&self, state: &mut H) { + state.write_u64(self.to_hash()); } } diff --git a/dc/s2n-quic-dc/src/fixed_map.rs b/dc/s2n-quic-dc/src/fixed_map.rs deleted file mode 100644 index 77184f533..000000000 --- a/dc/s2n-quic-dc/src/fixed_map.rs +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -//! A fixed-allocation concurrent HashMap. -//! -//! This implements a concurrent map backed by a fixed-size allocation created at construction -//! time, with a fixed memory footprint. The expectation is that all storage is inline (to the -//! extent possible) reducing the likelihood. - -use core::{ - fmt::Debug, - hash::Hash, - sync::atomic::{AtomicU8, Ordering}, -}; -use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; -use std::{collections::hash_map::RandomState, hash::BuildHasher}; - -pub use parking_lot::MappedRwLockReadGuard as ReadGuard; - -pub struct Map { - slots: Box<[Slot]>, - hash_builder: S, -} - -impl Map -where - K: Hash + Eq + Debug, - S: BuildHasher, -{ - pub fn with_capacity(entries: usize, hasher: S) -> Self { - let slots = std::cmp::max(1, (entries + SLOT_CAPACITY) / SLOT_CAPACITY).next_power_of_two(); - let map = Map { - slots: (0..slots) - .map(|_| Slot::new()) - .collect::>() - .into_boxed_slice(), - hash_builder: hasher, - }; - assert!(map.slots.len().is_power_of_two()); - assert!(u32::try_from(map.slots.len()).is_ok()); - map - } - - pub fn clear(&self) { - for slot in self.slots.iter() { - slot.clear(); - } - } - - pub fn count(&self) -> usize { - self.slots.iter().map(|s| s.len()).sum() - } - - // can't lend references to values outside of a lock, so Iterator interface doesn't work - #[allow(unused)] - pub fn iter(&self, mut f: impl FnMut(&K, &V)) { - for slot in self.slots.iter() { - // this feels more readable than flatten - #[allow(clippy::manual_flatten)] - for entry in slot.values.read().iter() { - if let Some(v) = entry { - f(&v.0, &v.1); - } - } - } - } - - pub fn retain(&self, mut f: impl FnMut(&K, &V) -> bool) { - for slot in self.slots.iter() { - // this feels more readable than flatten - #[allow(clippy::manual_flatten)] - for entry in slot.values.write().iter_mut() { - if let Some(v) = entry { - if !f(&v.0, &v.1) { - *entry = None; - } - } - } - } - } - - fn slot_by_hash(&self, key: &K) -> &Slot { - let hash = self.hash_builder.hash_one(key); - // needed for bit-and modulus, checked in new as a non-debug assert!. - debug_assert!(self.slots.len().is_power_of_two()); - let slot_idx = hash as usize & (self.slots.len() - 1); - &self.slots[slot_idx] - } - - /// Returns in .0 = Some(v) if overwriting a previous value for the same key. - /// Returns in .1 = Some(k, v) if we evicted some other entry. - #[must_use] - pub fn insert(&self, key: K, value: V) -> (Option, Option<(K, V)>) { - self.slot_by_hash(&key).put(key, value) - } - - /// This inserts the provided kv only if key is not currently in the map. - /// - /// Returns the evicted entry, if any. - #[must_use] - pub fn insert_new_key(&self, key: K, value: V) -> Option<(K, V)> { - self.slot_by_hash(&key).insert_new_key(key, value) - } - - pub fn contains_key(&self, key: &K) -> bool { - self.get_by_key(key).is_some() - } - - pub fn get_by_key(&self, key: &K) -> Option> { - self.slot_by_hash(key).get_by_key(key) - } -} - -// Balance of speed of access (put or get) and likelihood of false positive eviction. -const SLOT_CAPACITY: usize = 32; - -struct Slot { - next_write: AtomicU8, - values: RwLock<[Option<(K, V)>; SLOT_CAPACITY]>, -} - -impl Slot -where - K: Hash + Eq + Debug, -{ - fn new() -> Self { - Slot { - next_write: AtomicU8::new(0), - values: RwLock::new(std::array::from_fn(|_| None)), - } - } - - fn clear(&self) { - *self.values.write() = std::array::from_fn(|_| None); - } - - /// Returns in .0 = Some(v) if overwriting a previous value for the same key. - /// Returns in .1 = Some(k, v) if we evicted some other entry. - fn put(&self, new_key: K, new_value: V) -> (Option, Option<(K, V)>) { - let values = self.values.upgradable_read(); - for (value_idx, value) in values.iter().enumerate() { - // overwrite if same key or if no key/value pair yet - if value.as_ref().map_or(true, |(k, _)| *k == new_key) { - let mut values = RwLockUpgradableReadGuard::upgrade(values); - let old = values[value_idx].take().map(|v| v.1); - values[value_idx] = Some((new_key, new_value)); - return (old, None); - } - } - - ( - None, - self.insert_replacing( - RwLockUpgradableReadGuard::upgrade(values), - new_key, - new_value, - ), - ) - } - - fn insert_new_key(&self, new_key: K, new_value: V) -> Option<(K, V)> { - let entries = self.values.upgradable_read(); - for (idx, entry) in entries.iter().enumerate() { - // write only if no key/value pair yet - if entry.is_none() { - let mut entries = RwLockUpgradableReadGuard::upgrade(entries); - entries[idx] = Some((new_key, new_value)); - return None; - } - - if let Some(entry) = &entry { - if entry.0 == new_key { - return None; - } - } - } - - // if we didn't find ourselves, then we can upgrade and insert. - self.insert_replacing( - RwLockUpgradableReadGuard::upgrade(entries), - new_key, - new_value, - ) - } - - // Insert evicting an existing entry for a different key. - fn insert_replacing( - &self, - mut entries: RwLockWriteGuard<'_, [Option<(K, V)>; SLOT_CAPACITY]>, - new_key: K, - new_value: V, - ) -> Option<(K, V)> { - // If `new_key` isn't already in this slot, replace one of the existing entries with the - // new key. For now we rotate through based on `next_write`. - let replacement = self.next_write.fetch_add(1, Ordering::Relaxed) as usize % SLOT_CAPACITY; - tracing::trace!( - "evicting {:?} - bucket overflow", - entries[replacement].as_mut().unwrap().0 - ); - std::mem::replace(&mut entries[replacement], Some((new_key, new_value))) - } - - fn get_by_key(&self, needle: &K) -> Option> { - // Scan each value and check if our requested needle is present. - let values = self.values.read(); - for (value_idx, value) in values.iter().enumerate() { - if value.as_ref().is_some_and(|(k, _)| *k == *needle) { - return Some(RwLockReadGuard::map(values, |values| { - &values[value_idx].as_ref().unwrap().1 - })); - } - } - - None - } - - fn len(&self) -> usize { - let values = self.values.read(); - let mut len = 0; - for value in values.iter().enumerate() { - len += value.1.is_some() as usize; - } - len - } -} - -#[cfg(test)] -mod test; diff --git a/dc/s2n-quic-dc/src/lib.rs b/dc/s2n-quic-dc/src/lib.rs index d190508f6..fc6666fc8 100644 --- a/dc/s2n-quic-dc/src/lib.rs +++ b/dc/s2n-quic-dc/src/lib.rs @@ -9,7 +9,6 @@ pub mod credentials; pub mod crypto; pub mod datagram; pub mod event; -mod fixed_map; pub mod msg; pub mod packet; pub mod path; diff --git a/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs index da6180dc8..930220dfa 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs @@ -108,7 +108,7 @@ impl Cleaner { // For non-retired entries, if it's time for them to handshake again, request a // handshake to happen. This handshake will currently happen on the next request for this // particular peer. - state.ids.retain(|_, entry| { + state.ids.retain(|entry| { id_entries_initial += 1; let retained = if let Some(retired_at) = entry.retired_at() { @@ -139,7 +139,7 @@ impl Cleaner { // Drop IP entries if we no longer have the path secret ID entry. // FIXME: Don't require a loop to do this. This is likely somewhat slow since it takes a // write lock + read lock essentially per-entry, but should be near-constant-time. - state.peers.retain(|_, entry| { + state.peers.retain(|entry| { address_entries_initial += 1; let retained = state.ids.contains_key(entry.id()); diff --git a/dc/s2n-quic-dc/src/path/secret/map/state.rs b/dc/s2n-quic-dc/src/path/secret/map/state.rs index fdbfd262c..bc7729239 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/state.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/state.rs @@ -6,7 +6,6 @@ use crate::{ credentials::{Credentials, Id}, crypto, event::{self, EndpointPublisher as _, IntoEvent as _}, - fixed_map::{self, ReadGuard}, packet::{secret_control as control, Packet}, path::secret::receiver, }; @@ -15,7 +14,8 @@ use s2n_quic_core::{ time::{self, Timestamp}, }; use std::{ - hash::{BuildHasherDefault, Hasher}, + collections::VecDeque, + hash::{BuildHasher, Hash, Hasher}, net::{Ipv4Addr, SocketAddr}, sync::{Arc, Mutex, RwLock, Weak}, time::Duration, @@ -24,6 +24,119 @@ use std::{ #[cfg(test)] mod tests; +#[derive(Default)] +pub(crate) struct PeerMap( + Mutex>>, + std::collections::hash_map::RandomState, +); + +#[derive(Default)] +pub(crate) struct IdMap(Mutex>>); + +impl PeerMap { + fn hash(&self, entry: &Entry) -> u64 { + self.hash_key(entry.peer()) + } + + fn hash_key(&self, entry: &SocketAddr) -> u64 { + let mut hasher = self.1.build_hasher(); + entry.hash(&mut hasher); + hasher.finish() + } + + pub(crate) fn insert(&self, entry: Arc) -> Option> { + let hash = self.hash(&entry); + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + match map.entry(hash, |other| other.peer() == entry.peer(), |e| self.hash(e)) { + hashbrown::hash_table::Entry::Occupied(mut o) => { + Some(std::mem::replace(o.get_mut(), entry)) + } + hashbrown::hash_table::Entry::Vacant(v) => { + v.insert(entry); + None + } + } + } + + pub(crate) fn contains_key(&self, ip: &SocketAddr) -> bool { + let hash = self.hash_key(ip); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| o.peer() == ip).is_some() + } + + pub(crate) fn get(&self, peer: SocketAddr) -> Option> { + let hash = self.hash_key(&peer); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| *o.peer() == peer).cloned() + } + + pub(crate) fn clear(&self) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.clear(); + } + + pub(crate) fn retain(&self, mut filter: impl FnMut(&Arc) -> bool) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.retain(|e| filter(&*e)) + } + + fn len(&self) -> usize { + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.len() + } +} + +impl IdMap { + fn hash(&self, entry: &Entry) -> u64 { + self.hash_key(entry.id()) + } + + fn hash_key(&self, entry: &Id) -> u64 { + entry.to_hash() + } + + pub(crate) fn insert(&self, entry: Arc) -> Option> { + let hash = self.hash(&entry); + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + match map.entry(hash, |other| other.id() == entry.id(), |e| self.hash(e)) { + hashbrown::hash_table::Entry::Occupied(mut o) => { + Some(std::mem::replace(o.get_mut(), entry)) + } + hashbrown::hash_table::Entry::Vacant(v) => { + v.insert(entry); + None + } + } + } + + pub(crate) fn contains_key(&self, id: &Id) -> bool { + let hash = self.hash_key(id); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| o.id() == id).is_some() + } + + pub(crate) fn get(&self, id: Id) -> Option> { + let hash = self.hash_key(&id); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| *o.id() == id).cloned() + } + + pub(crate) fn clear(&self) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.clear(); + } + + pub(crate) fn retain(&self, mut filter: impl FnMut(&Arc) -> bool) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.retain(|e| filter(&*e)) + } + + fn len(&self) -> usize { + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.len() + } +} + // # Managing memory consumption // // For regular rotation with live peers, we retain at most two secrets: one derived from the most @@ -62,10 +175,15 @@ where // In the future it's likely we'll want to build bidirectional support in which case splitting // this into two maps (per the discussion in "Managing memory consumption" above) will be // needed. - pub(super) peers: fixed_map::Map>, + pub(super) peers: PeerMap, // All known entries. - pub(super) ids: fixed_map::Map, BuildHasherDefault>, + pub(super) ids: IdMap, + + // We evict entries based on FIFO order. When an entry is created, it gets added to the queue. + // Entries can die in one of two ways: exiting the queue, and replacement due to re-handshaking + // (if the peer address is the same). + pub(super) eviction_queue: Mutex>>, pub(super) signer: stateless_reset::Signer, @@ -127,8 +245,8 @@ where max_capacity: capacity, // FIXME: Allow configuring the rehandshake_period. rehandshake_period: Duration::from_secs(3600 * 24), - peers: fixed_map::Map::with_capacity(capacity, Default::default()), - ids: fixed_map::Map::with_capacity(capacity, Default::default()), + peers: Default::default(), + ids: Default::default(), cleaner: Cleaner::new(), signer, control_socket, @@ -244,7 +362,7 @@ where peer_address, }); - let Some(entry) = self.ids.get_by_key(packet.credential_id()) else { + let Some(entry) = self.ids.get(*packet.credential_id()) else { self.subscriber() .on_stale_key_packet_dropped(event::builder::StaleKeyPacketDropped { credential_id: packet.credential_id().into_event(), @@ -289,7 +407,7 @@ where }, ); - let Some(entry) = self.ids.get_by_key(packet.credential_id()) else { + let Some(entry) = self.ids.get(*packet.credential_id()) else { self.subscriber().on_replay_detected_packet_dropped( event::builder::ReplayDetectedPacketDropped { credential_id: packet.credential_id().into_event(), @@ -342,8 +460,8 @@ where #[allow(unused)] fn set_max_capacity(&mut self, new: usize) { self.max_capacity = new; - self.peers = fixed_map::Map::with_capacity(new, Default::default()); - self.ids = fixed_map::Map::with_capacity(new, Default::default()); + self.peers = Default::default(); + self.ids = Default::default(); } pub(super) fn subscriber(&self) -> event::EndpointPublisherSubscriber { @@ -365,11 +483,11 @@ where S: event::Subscriber, { fn secrets_len(&self) -> usize { - self.ids.count() + self.ids.len() } fn peers_len(&self) -> usize { - self.peers.count() + self.peers.len() } fn secrets_capacity(&self) -> usize { @@ -389,20 +507,41 @@ where let id = *entry.id(); let peer = entry.peer(); - let (same, other) = self.ids.insert(id, entry.clone()); + // This is the only place that inserts into the ID list. + let same = self.ids.insert(entry.clone()); if same.is_some() { // FIXME: Make insertion fallible and fail handshakes instead? panic!("inserting a path secret ID twice"); } - if let Some(evicted) = other { - self.subscriber().on_path_secret_map_id_entry_evicted( - event::builder::PathSecretMapIdEntryEvicted { - peer_address: SocketAddress::from(*evicted.1.peer()).into_event(), - credential_id: evicted.1.id().into_event(), - age: evicted.1.age(), - }, - ); + { + let mut queue = self + .eviction_queue + .lock() + .unwrap_or_else(|e| e.into_inner()); + + queue.push_back(Arc::downgrade(&entry)); + + // We are out of room, need to prune some entries. + if queue.len() == self.max_capacity { + // FIXME: Consider a more interesting algorithm, e.g., scanning the first N entries + // if the popped entry is still live to see if we can avoid dropping a live entry. + // May not be worth it in practice. + let element = queue.pop_front().unwrap(); + // Drop the queue lock prior to dropping element in case we wind up deallocating + // (to reduce queue lock contention) + drop(queue); + + if let Some(evicted) = element.upgrade() { + self.subscriber().on_path_secret_map_id_entry_evicted( + event::builder::PathSecretMapIdEntryEvicted { + peer_address: SocketAddress::from(*evicted.peer()).into_event(), + credential_id: evicted.id().into_event(), + age: evicted.age(), + }, + ); + } + } } self.subscriber().on_path_secret_map_entry_inserted( @@ -417,8 +556,7 @@ where let id = *entry.id(); let peer = *entry.peer(); - let (same, other) = self.peers.insert(peer, entry); - if let Some(prev) = same { + if let Some(prev) = self.peers.insert(entry) { // This shouldn't happen due to the panic in on_new_path_secrets, but just // in case something went wrong with the secret map we double check here. // FIXME: Make insertion fallible and fail handshakes instead? @@ -436,15 +574,10 @@ where ); } - if let Some(evicted) = other { - self.subscriber().on_path_secret_map_address_entry_evicted( - event::builder::PathSecretMapAddressEntryEvicted { - peer_address: SocketAddress::from(*evicted.1.peer()).into_event(), - credential_id: evicted.1.id().into_event(), - age: evicted.1.age(), - }, - ); - } + // Note we evict only based on "new entry" and that happens strictly in + // on_new_path_secrets, on_handshake_complete should never get an entry that's not already + // in the eviction queue. *Checking* that is unfortunately expensive since it's O(n) on the + // queue, so we don't do that. self.subscriber() .on_path_secret_map_entry_ready(event::builder::PathSecretMapEntryReady { @@ -457,12 +590,12 @@ where self.register_request_handshake(cb); } - fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>> { - self.peers.get_by_key(peer) + fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option> { + self.peers.get(*peer) } - fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option>> { - let result = self.peers.get_by_key(peer); + fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option> { + let result = self.peers.get(*peer); self.subscriber().on_path_secret_map_address_cache_accessed( event::builder::PathSecretMapAddressCacheAccessed { @@ -485,13 +618,12 @@ where result } - fn get_by_id_untracked(&self, id: &Id) -> Option>> { - self.ids.get_by_key(id) + fn get_by_id_untracked(&self, id: &Id) -> Option> { + self.ids.get(*id) } fn get_by_id_tracked(&self, id: &Id) -> Option> { - // clone here to avoid holding the lock while we insert into `peers`. - let result = self.ids.get_by_key(id).map(|v| Arc::clone(&v)); + let result = self.ids.get(*id); self.subscriber().on_path_secret_map_id_cache_accessed( event::builder::PathSecretMapIdCacheAccessed { @@ -510,23 +642,6 @@ where ); } - // Re-populate the peer cache with this entry if not already present. - // - // This ensures that a subsequent lookup by the address will likely succeed, refilling any - // previous evictions in the peer map. We skip insertion if there's already a peer entry - // since that might be a *newer* peer entry that should continue to stick around. - if let Some(entry) = &result { - if let Some(evicted) = self.peers.insert_new_key(*entry.peer(), entry.clone()) { - self.subscriber().on_path_secret_map_address_entry_evicted( - event::builder::PathSecretMapAddressEntryEvicted { - peer_address: SocketAddress::from(*evicted.1.peer()).into_event(), - credential_id: evicted.1.id().into_event(), - age: evicted.1.age(), - }, - ); - } - } - result } @@ -701,21 +816,3 @@ where ); } } - -#[derive(Default)] -pub(super) struct NoopIdHasher(Option); - -impl Hasher for NoopIdHasher { - fn finish(&self) -> u64 { - self.0.unwrap() - } - - fn write(&mut self, _bytes: &[u8]) { - unimplemented!() - } - - fn write_u64(&mut self, x: u64) { - debug_assert!(self.0.is_none()); - self.0 = Some(x); - } -} diff --git a/dc/s2n-quic-dc/src/path/secret/map/store.rs b/dc/s2n-quic-dc/src/path/secret/map/store.rs index 3637f00bb..c6243eedb 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/store.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/store.rs @@ -4,7 +4,6 @@ use super::Entry; use crate::{ credentials::{Credentials, Id}, - fixed_map::ReadGuard, packet::{secret_control as control, Packet, WireVersion}, path::secret::{receiver, stateless_reset}, }; @@ -27,11 +26,11 @@ pub trait Store: 'static + Send + Sync { fn contains(&self, peer: &SocketAddr) -> bool; - fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>>; + fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>; - fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option>>; + fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option>; - fn get_by_id_untracked(&self, id: &Id) -> Option>>; + fn get_by_id_untracked(&self, id: &Id) -> Option>; fn get_by_id_tracked(&self, id: &Id) -> Option>;