diff --git a/dc/s2n-quic-dc/Cargo.toml b/dc/s2n-quic-dc/Cargo.toml index a84da682c..1f592ad00 100644 --- a/dc/s2n-quic-dc/Cargo.toml +++ b/dc/s2n-quic-dc/Cargo.toml @@ -37,6 +37,7 @@ s2n-codec = { version = "=0.53.0", path = "../../common/s2n-codec", default-feat s2n-quic-core = { version = "=0.53.0", path = "../../quic/s2n-quic-core", default-features = false } s2n-quic-platform = { version = "=0.53.0", path = "../../quic/s2n-quic-platform" } slotmap = "1" +hashbrown = "0.15" thiserror = "2" tokio = { version = "1", default-features = false, features = ["sync"] } tracing = "0.1" diff --git a/dc/s2n-quic-dc/src/credentials.rs b/dc/s2n-quic-dc/src/credentials.rs index 57ac71223..0f362ad86 100644 --- a/dc/s2n-quic-dc/src/credentials.rs +++ b/dc/s2n-quic-dc/src/credentials.rs @@ -25,12 +25,18 @@ pub mod testing; #[repr(C)] pub struct Id([u8; 16]); -impl std::hash::Hash for Id { - fn hash(&self, state: &mut H) { +impl Id { + pub(crate) fn to_hash(self) -> u64 { // The ID has very high quality entropy already, so write just one half of it to keep hash // costs as low as possible. For the main use of the Hash impl in the fixed-size ID map // this translates to just directly using these bytes for the indexing. - state.write_u64(u64::from_ne_bytes(self.0[..8].try_into().unwrap())); + u64::from_ne_bytes(self.0[..8].try_into().unwrap()) + } +} + +impl std::hash::Hash for Id { + fn hash(&self, state: &mut H) { + state.write_u64(self.to_hash()); } } diff --git a/dc/s2n-quic-dc/src/fixed_map.rs b/dc/s2n-quic-dc/src/fixed_map.rs deleted file mode 100644 index 77184f533..000000000 --- a/dc/s2n-quic-dc/src/fixed_map.rs +++ /dev/null @@ -1,228 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -//! A fixed-allocation concurrent HashMap. -//! -//! This implements a concurrent map backed by a fixed-size allocation created at construction -//! time, with a fixed memory footprint. The expectation is that all storage is inline (to the -//! extent possible) reducing the likelihood. - -use core::{ - fmt::Debug, - hash::Hash, - sync::atomic::{AtomicU8, Ordering}, -}; -use parking_lot::{RwLock, RwLockReadGuard, RwLockUpgradableReadGuard, RwLockWriteGuard}; -use std::{collections::hash_map::RandomState, hash::BuildHasher}; - -pub use parking_lot::MappedRwLockReadGuard as ReadGuard; - -pub struct Map { - slots: Box<[Slot]>, - hash_builder: S, -} - -impl Map -where - K: Hash + Eq + Debug, - S: BuildHasher, -{ - pub fn with_capacity(entries: usize, hasher: S) -> Self { - let slots = std::cmp::max(1, (entries + SLOT_CAPACITY) / SLOT_CAPACITY).next_power_of_two(); - let map = Map { - slots: (0..slots) - .map(|_| Slot::new()) - .collect::>() - .into_boxed_slice(), - hash_builder: hasher, - }; - assert!(map.slots.len().is_power_of_two()); - assert!(u32::try_from(map.slots.len()).is_ok()); - map - } - - pub fn clear(&self) { - for slot in self.slots.iter() { - slot.clear(); - } - } - - pub fn count(&self) -> usize { - self.slots.iter().map(|s| s.len()).sum() - } - - // can't lend references to values outside of a lock, so Iterator interface doesn't work - #[allow(unused)] - pub fn iter(&self, mut f: impl FnMut(&K, &V)) { - for slot in self.slots.iter() { - // this feels more readable than flatten - #[allow(clippy::manual_flatten)] - for entry in slot.values.read().iter() { - if let Some(v) = entry { - f(&v.0, &v.1); - } - } - } - } - - pub fn retain(&self, mut f: impl FnMut(&K, &V) -> bool) { - for slot in self.slots.iter() { - // this feels more readable than flatten - #[allow(clippy::manual_flatten)] - for entry in slot.values.write().iter_mut() { - if let Some(v) = entry { - if !f(&v.0, &v.1) { - *entry = None; - } - } - } - } - } - - fn slot_by_hash(&self, key: &K) -> &Slot { - let hash = self.hash_builder.hash_one(key); - // needed for bit-and modulus, checked in new as a non-debug assert!. - debug_assert!(self.slots.len().is_power_of_two()); - let slot_idx = hash as usize & (self.slots.len() - 1); - &self.slots[slot_idx] - } - - /// Returns in .0 = Some(v) if overwriting a previous value for the same key. - /// Returns in .1 = Some(k, v) if we evicted some other entry. - #[must_use] - pub fn insert(&self, key: K, value: V) -> (Option, Option<(K, V)>) { - self.slot_by_hash(&key).put(key, value) - } - - /// This inserts the provided kv only if key is not currently in the map. - /// - /// Returns the evicted entry, if any. - #[must_use] - pub fn insert_new_key(&self, key: K, value: V) -> Option<(K, V)> { - self.slot_by_hash(&key).insert_new_key(key, value) - } - - pub fn contains_key(&self, key: &K) -> bool { - self.get_by_key(key).is_some() - } - - pub fn get_by_key(&self, key: &K) -> Option> { - self.slot_by_hash(key).get_by_key(key) - } -} - -// Balance of speed of access (put or get) and likelihood of false positive eviction. -const SLOT_CAPACITY: usize = 32; - -struct Slot { - next_write: AtomicU8, - values: RwLock<[Option<(K, V)>; SLOT_CAPACITY]>, -} - -impl Slot -where - K: Hash + Eq + Debug, -{ - fn new() -> Self { - Slot { - next_write: AtomicU8::new(0), - values: RwLock::new(std::array::from_fn(|_| None)), - } - } - - fn clear(&self) { - *self.values.write() = std::array::from_fn(|_| None); - } - - /// Returns in .0 = Some(v) if overwriting a previous value for the same key. - /// Returns in .1 = Some(k, v) if we evicted some other entry. - fn put(&self, new_key: K, new_value: V) -> (Option, Option<(K, V)>) { - let values = self.values.upgradable_read(); - for (value_idx, value) in values.iter().enumerate() { - // overwrite if same key or if no key/value pair yet - if value.as_ref().map_or(true, |(k, _)| *k == new_key) { - let mut values = RwLockUpgradableReadGuard::upgrade(values); - let old = values[value_idx].take().map(|v| v.1); - values[value_idx] = Some((new_key, new_value)); - return (old, None); - } - } - - ( - None, - self.insert_replacing( - RwLockUpgradableReadGuard::upgrade(values), - new_key, - new_value, - ), - ) - } - - fn insert_new_key(&self, new_key: K, new_value: V) -> Option<(K, V)> { - let entries = self.values.upgradable_read(); - for (idx, entry) in entries.iter().enumerate() { - // write only if no key/value pair yet - if entry.is_none() { - let mut entries = RwLockUpgradableReadGuard::upgrade(entries); - entries[idx] = Some((new_key, new_value)); - return None; - } - - if let Some(entry) = &entry { - if entry.0 == new_key { - return None; - } - } - } - - // if we didn't find ourselves, then we can upgrade and insert. - self.insert_replacing( - RwLockUpgradableReadGuard::upgrade(entries), - new_key, - new_value, - ) - } - - // Insert evicting an existing entry for a different key. - fn insert_replacing( - &self, - mut entries: RwLockWriteGuard<'_, [Option<(K, V)>; SLOT_CAPACITY]>, - new_key: K, - new_value: V, - ) -> Option<(K, V)> { - // If `new_key` isn't already in this slot, replace one of the existing entries with the - // new key. For now we rotate through based on `next_write`. - let replacement = self.next_write.fetch_add(1, Ordering::Relaxed) as usize % SLOT_CAPACITY; - tracing::trace!( - "evicting {:?} - bucket overflow", - entries[replacement].as_mut().unwrap().0 - ); - std::mem::replace(&mut entries[replacement], Some((new_key, new_value))) - } - - fn get_by_key(&self, needle: &K) -> Option> { - // Scan each value and check if our requested needle is present. - let values = self.values.read(); - for (value_idx, value) in values.iter().enumerate() { - if value.as_ref().is_some_and(|(k, _)| *k == *needle) { - return Some(RwLockReadGuard::map(values, |values| { - &values[value_idx].as_ref().unwrap().1 - })); - } - } - - None - } - - fn len(&self) -> usize { - let values = self.values.read(); - let mut len = 0; - for value in values.iter().enumerate() { - len += value.1.is_some() as usize; - } - len - } -} - -#[cfg(test)] -mod test; diff --git a/dc/s2n-quic-dc/src/lib.rs b/dc/s2n-quic-dc/src/lib.rs index d190508f6..fc6666fc8 100644 --- a/dc/s2n-quic-dc/src/lib.rs +++ b/dc/s2n-quic-dc/src/lib.rs @@ -9,7 +9,6 @@ pub mod credentials; pub mod crypto; pub mod datagram; pub mod event; -mod fixed_map; pub mod msg; pub mod packet; pub mod path; diff --git a/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs index da6180dc8..930220dfa 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/cleaner.rs @@ -108,7 +108,7 @@ impl Cleaner { // For non-retired entries, if it's time for them to handshake again, request a // handshake to happen. This handshake will currently happen on the next request for this // particular peer. - state.ids.retain(|_, entry| { + state.ids.retain(|entry| { id_entries_initial += 1; let retained = if let Some(retired_at) = entry.retired_at() { @@ -139,7 +139,7 @@ impl Cleaner { // Drop IP entries if we no longer have the path secret ID entry. // FIXME: Don't require a loop to do this. This is likely somewhat slow since it takes a // write lock + read lock essentially per-entry, but should be near-constant-time. - state.peers.retain(|_, entry| { + state.peers.retain(|entry| { address_entries_initial += 1; let retained = state.ids.contains_key(entry.id()); diff --git a/dc/s2n-quic-dc/src/path/secret/map/state.rs b/dc/s2n-quic-dc/src/path/secret/map/state.rs index fdbfd262c..bc7729239 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/state.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/state.rs @@ -6,7 +6,6 @@ use crate::{ credentials::{Credentials, Id}, crypto, event::{self, EndpointPublisher as _, IntoEvent as _}, - fixed_map::{self, ReadGuard}, packet::{secret_control as control, Packet}, path::secret::receiver, }; @@ -15,7 +14,8 @@ use s2n_quic_core::{ time::{self, Timestamp}, }; use std::{ - hash::{BuildHasherDefault, Hasher}, + collections::VecDeque, + hash::{BuildHasher, Hash, Hasher}, net::{Ipv4Addr, SocketAddr}, sync::{Arc, Mutex, RwLock, Weak}, time::Duration, @@ -24,6 +24,119 @@ use std::{ #[cfg(test)] mod tests; +#[derive(Default)] +pub(crate) struct PeerMap( + Mutex>>, + std::collections::hash_map::RandomState, +); + +#[derive(Default)] +pub(crate) struct IdMap(Mutex>>); + +impl PeerMap { + fn hash(&self, entry: &Entry) -> u64 { + self.hash_key(entry.peer()) + } + + fn hash_key(&self, entry: &SocketAddr) -> u64 { + let mut hasher = self.1.build_hasher(); + entry.hash(&mut hasher); + hasher.finish() + } + + pub(crate) fn insert(&self, entry: Arc) -> Option> { + let hash = self.hash(&entry); + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + match map.entry(hash, |other| other.peer() == entry.peer(), |e| self.hash(e)) { + hashbrown::hash_table::Entry::Occupied(mut o) => { + Some(std::mem::replace(o.get_mut(), entry)) + } + hashbrown::hash_table::Entry::Vacant(v) => { + v.insert(entry); + None + } + } + } + + pub(crate) fn contains_key(&self, ip: &SocketAddr) -> bool { + let hash = self.hash_key(ip); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| o.peer() == ip).is_some() + } + + pub(crate) fn get(&self, peer: SocketAddr) -> Option> { + let hash = self.hash_key(&peer); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| *o.peer() == peer).cloned() + } + + pub(crate) fn clear(&self) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.clear(); + } + + pub(crate) fn retain(&self, mut filter: impl FnMut(&Arc) -> bool) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.retain(|e| filter(&*e)) + } + + fn len(&self) -> usize { + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.len() + } +} + +impl IdMap { + fn hash(&self, entry: &Entry) -> u64 { + self.hash_key(entry.id()) + } + + fn hash_key(&self, entry: &Id) -> u64 { + entry.to_hash() + } + + pub(crate) fn insert(&self, entry: Arc) -> Option> { + let hash = self.hash(&entry); + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + match map.entry(hash, |other| other.id() == entry.id(), |e| self.hash(e)) { + hashbrown::hash_table::Entry::Occupied(mut o) => { + Some(std::mem::replace(o.get_mut(), entry)) + } + hashbrown::hash_table::Entry::Vacant(v) => { + v.insert(entry); + None + } + } + } + + pub(crate) fn contains_key(&self, id: &Id) -> bool { + let hash = self.hash_key(id); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| o.id() == id).is_some() + } + + pub(crate) fn get(&self, id: Id) -> Option> { + let hash = self.hash_key(&id); + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.find(hash, |o| *o.id() == id).cloned() + } + + pub(crate) fn clear(&self) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.clear(); + } + + pub(crate) fn retain(&self, mut filter: impl FnMut(&Arc) -> bool) { + let mut map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.retain(|e| filter(&*e)) + } + + fn len(&self) -> usize { + let map = self.0.lock().unwrap_or_else(|e| e.into_inner()); + map.len() + } +} + // # Managing memory consumption // // For regular rotation with live peers, we retain at most two secrets: one derived from the most @@ -62,10 +175,15 @@ where // In the future it's likely we'll want to build bidirectional support in which case splitting // this into two maps (per the discussion in "Managing memory consumption" above) will be // needed. - pub(super) peers: fixed_map::Map>, + pub(super) peers: PeerMap, // All known entries. - pub(super) ids: fixed_map::Map, BuildHasherDefault>, + pub(super) ids: IdMap, + + // We evict entries based on FIFO order. When an entry is created, it gets added to the queue. + // Entries can die in one of two ways: exiting the queue, and replacement due to re-handshaking + // (if the peer address is the same). + pub(super) eviction_queue: Mutex>>, pub(super) signer: stateless_reset::Signer, @@ -127,8 +245,8 @@ where max_capacity: capacity, // FIXME: Allow configuring the rehandshake_period. rehandshake_period: Duration::from_secs(3600 * 24), - peers: fixed_map::Map::with_capacity(capacity, Default::default()), - ids: fixed_map::Map::with_capacity(capacity, Default::default()), + peers: Default::default(), + ids: Default::default(), cleaner: Cleaner::new(), signer, control_socket, @@ -244,7 +362,7 @@ where peer_address, }); - let Some(entry) = self.ids.get_by_key(packet.credential_id()) else { + let Some(entry) = self.ids.get(*packet.credential_id()) else { self.subscriber() .on_stale_key_packet_dropped(event::builder::StaleKeyPacketDropped { credential_id: packet.credential_id().into_event(), @@ -289,7 +407,7 @@ where }, ); - let Some(entry) = self.ids.get_by_key(packet.credential_id()) else { + let Some(entry) = self.ids.get(*packet.credential_id()) else { self.subscriber().on_replay_detected_packet_dropped( event::builder::ReplayDetectedPacketDropped { credential_id: packet.credential_id().into_event(), @@ -342,8 +460,8 @@ where #[allow(unused)] fn set_max_capacity(&mut self, new: usize) { self.max_capacity = new; - self.peers = fixed_map::Map::with_capacity(new, Default::default()); - self.ids = fixed_map::Map::with_capacity(new, Default::default()); + self.peers = Default::default(); + self.ids = Default::default(); } pub(super) fn subscriber(&self) -> event::EndpointPublisherSubscriber { @@ -365,11 +483,11 @@ where S: event::Subscriber, { fn secrets_len(&self) -> usize { - self.ids.count() + self.ids.len() } fn peers_len(&self) -> usize { - self.peers.count() + self.peers.len() } fn secrets_capacity(&self) -> usize { @@ -389,20 +507,41 @@ where let id = *entry.id(); let peer = entry.peer(); - let (same, other) = self.ids.insert(id, entry.clone()); + // This is the only place that inserts into the ID list. + let same = self.ids.insert(entry.clone()); if same.is_some() { // FIXME: Make insertion fallible and fail handshakes instead? panic!("inserting a path secret ID twice"); } - if let Some(evicted) = other { - self.subscriber().on_path_secret_map_id_entry_evicted( - event::builder::PathSecretMapIdEntryEvicted { - peer_address: SocketAddress::from(*evicted.1.peer()).into_event(), - credential_id: evicted.1.id().into_event(), - age: evicted.1.age(), - }, - ); + { + let mut queue = self + .eviction_queue + .lock() + .unwrap_or_else(|e| e.into_inner()); + + queue.push_back(Arc::downgrade(&entry)); + + // We are out of room, need to prune some entries. + if queue.len() == self.max_capacity { + // FIXME: Consider a more interesting algorithm, e.g., scanning the first N entries + // if the popped entry is still live to see if we can avoid dropping a live entry. + // May not be worth it in practice. + let element = queue.pop_front().unwrap(); + // Drop the queue lock prior to dropping element in case we wind up deallocating + // (to reduce queue lock contention) + drop(queue); + + if let Some(evicted) = element.upgrade() { + self.subscriber().on_path_secret_map_id_entry_evicted( + event::builder::PathSecretMapIdEntryEvicted { + peer_address: SocketAddress::from(*evicted.peer()).into_event(), + credential_id: evicted.id().into_event(), + age: evicted.age(), + }, + ); + } + } } self.subscriber().on_path_secret_map_entry_inserted( @@ -417,8 +556,7 @@ where let id = *entry.id(); let peer = *entry.peer(); - let (same, other) = self.peers.insert(peer, entry); - if let Some(prev) = same { + if let Some(prev) = self.peers.insert(entry) { // This shouldn't happen due to the panic in on_new_path_secrets, but just // in case something went wrong with the secret map we double check here. // FIXME: Make insertion fallible and fail handshakes instead? @@ -436,15 +574,10 @@ where ); } - if let Some(evicted) = other { - self.subscriber().on_path_secret_map_address_entry_evicted( - event::builder::PathSecretMapAddressEntryEvicted { - peer_address: SocketAddress::from(*evicted.1.peer()).into_event(), - credential_id: evicted.1.id().into_event(), - age: evicted.1.age(), - }, - ); - } + // Note we evict only based on "new entry" and that happens strictly in + // on_new_path_secrets, on_handshake_complete should never get an entry that's not already + // in the eviction queue. *Checking* that is unfortunately expensive since it's O(n) on the + // queue, so we don't do that. self.subscriber() .on_path_secret_map_entry_ready(event::builder::PathSecretMapEntryReady { @@ -457,12 +590,12 @@ where self.register_request_handshake(cb); } - fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>> { - self.peers.get_by_key(peer) + fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option> { + self.peers.get(*peer) } - fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option>> { - let result = self.peers.get_by_key(peer); + fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option> { + let result = self.peers.get(*peer); self.subscriber().on_path_secret_map_address_cache_accessed( event::builder::PathSecretMapAddressCacheAccessed { @@ -485,13 +618,12 @@ where result } - fn get_by_id_untracked(&self, id: &Id) -> Option>> { - self.ids.get_by_key(id) + fn get_by_id_untracked(&self, id: &Id) -> Option> { + self.ids.get(*id) } fn get_by_id_tracked(&self, id: &Id) -> Option> { - // clone here to avoid holding the lock while we insert into `peers`. - let result = self.ids.get_by_key(id).map(|v| Arc::clone(&v)); + let result = self.ids.get(*id); self.subscriber().on_path_secret_map_id_cache_accessed( event::builder::PathSecretMapIdCacheAccessed { @@ -510,23 +642,6 @@ where ); } - // Re-populate the peer cache with this entry if not already present. - // - // This ensures that a subsequent lookup by the address will likely succeed, refilling any - // previous evictions in the peer map. We skip insertion if there's already a peer entry - // since that might be a *newer* peer entry that should continue to stick around. - if let Some(entry) = &result { - if let Some(evicted) = self.peers.insert_new_key(*entry.peer(), entry.clone()) { - self.subscriber().on_path_secret_map_address_entry_evicted( - event::builder::PathSecretMapAddressEntryEvicted { - peer_address: SocketAddress::from(*evicted.1.peer()).into_event(), - credential_id: evicted.1.id().into_event(), - age: evicted.1.age(), - }, - ); - } - } - result } @@ -701,21 +816,3 @@ where ); } } - -#[derive(Default)] -pub(super) struct NoopIdHasher(Option); - -impl Hasher for NoopIdHasher { - fn finish(&self) -> u64 { - self.0.unwrap() - } - - fn write(&mut self, _bytes: &[u8]) { - unimplemented!() - } - - fn write_u64(&mut self, x: u64) { - debug_assert!(self.0.is_none()); - self.0 = Some(x); - } -} diff --git a/dc/s2n-quic-dc/src/path/secret/map/store.rs b/dc/s2n-quic-dc/src/path/secret/map/store.rs index 3637f00bb..c6243eedb 100644 --- a/dc/s2n-quic-dc/src/path/secret/map/store.rs +++ b/dc/s2n-quic-dc/src/path/secret/map/store.rs @@ -4,7 +4,6 @@ use super::Entry; use crate::{ credentials::{Credentials, Id}, - fixed_map::ReadGuard, packet::{secret_control as control, Packet, WireVersion}, path::secret::{receiver, stateless_reset}, }; @@ -27,11 +26,11 @@ pub trait Store: 'static + Send + Sync { fn contains(&self, peer: &SocketAddr) -> bool; - fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>>; + fn get_by_addr_untracked(&self, peer: &SocketAddr) -> Option>; - fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option>>; + fn get_by_addr_tracked(&self, peer: &SocketAddr) -> Option>; - fn get_by_id_untracked(&self, id: &Id) -> Option>>; + fn get_by_id_untracked(&self, id: &Id) -> Option>; fn get_by_id_tracked(&self, id: &Id) -> Option>;