diff --git a/neqo-common/Cargo.toml b/neqo-common/Cargo.toml index 0cb4bcbf4f..069d67b834 100644 --- a/neqo-common/Cargo.toml +++ b/neqo-common/Cargo.toml @@ -34,3 +34,7 @@ features = ["timeapi"] [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options bench = false + +[[bench]] +name = "timer" +harness = false diff --git a/neqo-common/benches/timer.rs b/neqo-common/benches/timer.rs new file mode 100644 index 0000000000..5ac8019db4 --- /dev/null +++ b/neqo-common/benches/timer.rs @@ -0,0 +1,39 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::time::{Duration, Instant}; + +use criterion::{criterion_group, criterion_main, Criterion}; +use neqo_common::timer::Timer; +use test_fixture::now; + +fn benchmark_timer(c: &mut Criterion) { + c.bench_function("drain a timer quickly", |b| { + b.iter_batched_ref( + make_timer, + |(_now, timer)| { + while let Some(t) = timer.next_time() { + assert!(timer.take_next(t).is_some()); + } + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +fn make_timer() -> (Instant, Timer<()>) { + const TIMES: &[u64] = &[1, 2, 3, 5, 8, 13, 21, 34]; + + let now = now(); + let mut timer = Timer::new(now, Duration::from_millis(777), 100); + for &t in TIMES { + timer.add(now + Duration::from_secs(t), ()); + } + (now, timer) +} + +criterion_group!(benches, benchmark_timer); +criterion_main!(benches); diff --git a/neqo-common/src/lib.rs b/neqo-common/src/lib.rs index f3e8e63023..e988c6071d 100644 --- a/neqo-common/src/lib.rs +++ b/neqo-common/src/lib.rs @@ -14,6 +14,7 @@ pub mod hrtime; mod incrdecoder; pub mod log; pub mod qlog; +pub mod timer; pub mod tos; use std::fmt::Write; diff --git a/neqo-common/src/timer.rs b/neqo-common/src/timer.rs new file mode 100644 index 0000000000..3feddb2226 --- /dev/null +++ b/neqo-common/src/timer.rs @@ -0,0 +1,420 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + collections::VecDeque, + mem, + time::{Duration, Instant}, +}; + +/// Internal structure for a timer item. +struct TimerItem { + time: Instant, + item: T, +} + +impl TimerItem { + fn time(ti: &Self) -> Instant { + ti.time + } +} + +/// A timer queue. +/// This uses a classic timer wheel arrangement, with some characteristics that might be considered +/// peculiar. Each slot in the wheel is sorted (complexity O(N) insertions, but O(logN) to find cut +/// points). Time is relative, the wheel has an origin time and it is unable to represent times that +/// are more than `granularity * capacity` past that time. +pub struct Timer { + items: Vec>>, + now: Instant, + granularity: Duration, + cursor: usize, +} + +impl Timer { + /// Construct a new wheel at the given granularity, starting at the given time. + /// + /// # Panics + /// + /// When `capacity` is too large to fit in `u32` or `granularity` is zero. + pub fn new(now: Instant, granularity: Duration, capacity: usize) -> Self { + assert!(u32::try_from(capacity).is_ok()); + assert!(granularity.as_nanos() > 0); + let mut items = Vec::with_capacity(capacity); + items.resize_with(capacity, Default::default); + Self { + items, + now, + granularity, + cursor: 0, + } + } + + /// Return a reference to the time of the next entry. + #[must_use] + pub fn next_time(&self) -> Option { + let idx = self.bucket(0); + for i in idx..self.items.len() { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + for i in 0..idx { + if let Some(t) = self.items[i].front() { + return Some(t.time); + } + } + None + } + + /// Get the full span of time that this can cover. + /// Two timers cannot be more than this far apart. + /// In practice, this value is less by one amount of the timer granularity. + #[inline] + #[allow(clippy::cast_possible_truncation)] // guarded by assertion + #[must_use] + pub fn span(&self) -> Duration { + self.granularity * (self.items.len() as u32) + } + + /// For the given `time`, get the number of whole buckets in the future that is. + #[inline] + #[allow(clippy::cast_possible_truncation)] // guarded by assertion + fn delta(&self, time: Instant) -> usize { + // This really should use Duration::div_duration_f??(), but it can't yet. + ((time - self.now).as_nanos() / self.granularity.as_nanos()) as usize + } + + #[inline] + fn time_bucket(&self, time: Instant) -> usize { + self.bucket(self.delta(time)) + } + + #[inline] + fn bucket(&self, delta: usize) -> usize { + debug_assert!(delta < self.items.len()); + (self.cursor + delta) % self.items.len() + } + + /// Slide forward in time by `n * self.granularity`. + #[allow(clippy::cast_possible_truncation, clippy::reversed_empty_ranges)] + // cast_possible_truncation is ok because we have an assertion guard. + // reversed_empty_ranges is to avoid different types on the if/else. + fn tick(&mut self, n: usize) { + let new = self.bucket(n); + let iter = if new < self.cursor { + (self.cursor..self.items.len()).chain(0..new) + } else { + (self.cursor..new).chain(0..0) + }; + for i in iter { + assert!(self.items[i].is_empty()); + } + self.now += self.granularity * (n as u32); + self.cursor = new; + } + + /// Asserts if the time given is in the past or too far in the future. + /// + /// # Panics + /// + /// When `time` is in the past relative to previous calls. + pub fn add(&mut self, time: Instant, item: T) { + assert!(time >= self.now); + // Skip forward quickly if there is too large a gap. + let short_span = self.span() - self.granularity; + if time >= (self.now + self.span() + short_span) { + // Assert that there aren't any items. + for i in &self.items { + debug_assert!(i.is_empty()); + } + self.now = time.checked_sub(short_span).unwrap(); + self.cursor = 0; + } + + // Adjust time forward the minimum amount necessary. + let mut d = self.delta(time); + if d >= self.items.len() { + self.tick(1 + d - self.items.len()); + d = self.items.len() - 1; + } + + let bucket = self.bucket(d); + let ins = match self.items[bucket].binary_search_by_key(&time, TimerItem::time) { + Ok(j) | Err(j) => j, + }; + self.items[bucket].insert(ins, TimerItem { time, item }); + } + + /// Given knowledge of the time an item was added, remove it. + /// This requires use of a predicate that identifies matching items. + /// + /// # Panics + /// Impossible, I think. + pub fn remove(&mut self, time: Instant, mut selector: F) -> Option + where + F: FnMut(&T) -> bool, + { + if time < self.now { + return None; + } + if time > self.now + self.span() { + return None; + } + let bucket = self.time_bucket(time); + let Ok(start_index) = self.items[bucket].binary_search_by_key(&time, TimerItem::time) + else { + return None; + }; + // start_index is just one of potentially many items with the same time. + // Search backwards for a match, ... + for i in (0..=start_index).rev() { + if self.items[bucket][i].time != time { + break; + } + if selector(&self.items[bucket][i].item) { + return Some(self.items[bucket].remove(i).unwrap().item); + } + } + // ... then forwards. + for i in (start_index + 1)..self.items[bucket].len() { + if self.items[bucket][i].time != time { + break; + } + if selector(&self.items[bucket][i].item) { + return Some(self.items[bucket].remove(i).unwrap().item); + } + } + None + } + + /// Take the next item, unless there are no items with + /// a timeout in the past relative to `until`. + pub fn take_next(&mut self, until: Instant) -> Option { + fn maybe_take(v: &mut VecDeque>, until: Instant) -> Option { + if !v.is_empty() && v[0].time <= until { + Some(v.pop_front().unwrap().item) + } else { + None + } + } + + let idx = self.bucket(0); + for i in idx..self.items.len() { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + for i in 0..idx { + let res = maybe_take(&mut self.items[i], until); + if res.is_some() { + return res; + } + } + None + } + + /// Create an iterator that takes all items until the given time. + /// Note: Items might be removed even if the iterator is not fully exhausted. + pub fn take_until(&mut self, until: Instant) -> impl Iterator { + let get_item = move |x: TimerItem| x.item; + if until >= self.now + self.span() { + // Drain everything, so a clean sweep. + let mut empty_items = Vec::with_capacity(self.items.len()); + empty_items.resize_with(self.items.len(), VecDeque::default); + let mut items = mem::replace(&mut self.items, empty_items); + self.now = until; + self.cursor = 0; + + let tail = items.split_off(self.cursor); + return tail.into_iter().chain(items).flatten().map(get_item); + } + + // Only returning a partial span, so do it bucket at a time. + let delta = self.delta(until); + let mut buckets = Vec::with_capacity(delta + 1); + + // First, the whole buckets. + for i in 0..delta { + let idx = self.bucket(i); + buckets.push(mem::take(&mut self.items[idx])); + } + self.tick(delta); + + // Now we need to split the last bucket, because there might be + // some items with `item.time > until`. + let bucket = &mut self.items[self.cursor]; + let last_idx = match bucket.binary_search_by_key(&until, TimerItem::time) { + Ok(mut m) => { + // If there are multiple values, the search will hit any of them. + // Make sure to get them all. + while m < bucket.len() && bucket[m].time == until { + m += 1; + } + m + } + Err(ins) => ins, + }; + let tail = bucket.split_off(last_idx); + buckets.push(mem::replace(bucket, tail)); + // This tomfoolery with the empty vector ensures that + // the returned type here matches the one above precisely + // without having to invoke the `either` crate. + buckets.into_iter().chain(vec![]).flatten().map(get_item) + } +} + +#[cfg(test)] +mod test { + use std::sync::OnceLock; + + use super::{Duration, Instant, Timer}; + + fn now() -> Instant { + static NOW: OnceLock = OnceLock::new(); + *NOW.get_or_init(Instant::now) + } + + const GRANULARITY: Duration = Duration::from_millis(10); + const CAPACITY: usize = 10; + #[test] + fn create() { + let t: Timer<()> = Timer::new(now(), GRANULARITY, CAPACITY); + assert_eq!(t.span(), Duration::from_millis(100)); + assert_eq!(None, t.next_time()); + } + + #[test] + fn immediate_entry() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + t.add(now(), 12); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); + assert_eq!(vec![12], values); + } + + #[test] + fn same_time() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let v1 = 12; + let v2 = 13; + t.add(now(), v1); + t.add(now(), v2); + assert_eq!(now(), t.next_time().expect("should have an entry")); + let values: Vec<_> = t.take_until(now()).collect(); + assert!(values.contains(&v1)); + assert!(values.contains(&v2)); + } + + #[test] + fn add() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let near_future = now() + Duration::from_millis(17); + let v = 9; + t.add(near_future, v); + assert_eq!(near_future, t.next_time().expect("should return a value")); + assert_eq!( + t.take_until(near_future.checked_sub(Duration::from_millis(1)).unwrap()) + .count(), + 0 + ); + assert!(t + .take_until(near_future + Duration::from_millis(1)) + .any(|x| x == v)); + } + + #[test] + fn add_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + assert_eq!(future, t.next_time().expect("should return a value")); + assert!(t.take_until(future).any(|x| x == v)); + } + + #[test] + fn add_far_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let far_future = now() + Duration::from_millis(892); + let v = 9; + t.add(far_future, v); + assert_eq!(far_future, t.next_time().expect("should return a value")); + assert!(t.take_until(far_future).any(|x| x == v)); + } + + const TIMES: &[Duration] = &[ + Duration::from_millis(40), + Duration::from_millis(91), + Duration::from_millis(6), + Duration::from_millis(3), + Duration::from_millis(22), + Duration::from_millis(40), + ]; + + fn with_times() -> Timer { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + for (i, time) in TIMES.iter().enumerate() { + t.add(now() + *time, i); + } + assert_eq!( + now() + *TIMES.iter().min().unwrap(), + t.next_time().expect("should have a time") + ); + t + } + + #[test] + #[allow(clippy::needless_collect)] // false positive + fn multiple_values() { + let mut t = with_times(); + let values: Vec<_> = t.take_until(now() + *TIMES.iter().max().unwrap()).collect(); + for i in 0..TIMES.len() { + assert!(values.contains(&i)); + } + } + + #[test] + #[allow(clippy::needless_collect)] // false positive + fn take_far_future() { + let mut t = with_times(); + let values: Vec<_> = t.take_until(now() + Duration::from_secs(100)).collect(); + for i in 0..TIMES.len() { + assert!(values.contains(&i)); + } + } + + #[test] + fn remove_each() { + let mut t = with_times(); + for (i, time) in TIMES.iter().enumerate() { + assert_eq!(Some(i), t.remove(now() + *time, |&x| x == i)); + } + assert_eq!(None, t.next_time()); + } + + #[test] + fn remove_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + + assert_eq!(Some(v), t.remove(future, |candidate| *candidate == v)); + } + + #[test] + fn remove_too_far_future() { + let mut t = Timer::new(now(), GRANULARITY, CAPACITY); + let future = now() + Duration::from_millis(117); + let too_far_future = now() + t.span() + Duration::from_millis(117); + let v = 9; + t.add(future, v); + + assert_eq!(None, t.remove(too_far_future, |candidate| *candidate == v)); + } +} diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 7d3d144a09..96a6244ef1 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -15,12 +15,12 @@ use std::{ ops::{Deref, DerefMut}, path::PathBuf, rc::{Rc, Weak}, - time::Instant, + time::{Duration, Instant}, }; use neqo_common::{ self as common, event::Provider, hex, qdebug, qerror, qinfo, qlog::NeqoQlog, qtrace, qwarn, - Datagram, Decoder, Role, + timer::Timer, Datagram, Decoder, Role, }; use neqo_crypto::{ encode_ech_config, AntiReplay, Cipher, PrivateKey, PublicKey, ZeroRttCheckResult, @@ -46,6 +46,13 @@ pub enum InitialResult { /// `MIN_INITIAL_PACKET_SIZE` is the smallest packet that can be used to establish /// a new connection across all QUIC versions this server supports. const MIN_INITIAL_PACKET_SIZE: usize = 1200; +/// The size of timer buckets. This is higher than the actual timer granularity +/// as this depends on there being some distribution of events. +const TIMER_GRANULARITY: Duration = Duration::from_millis(4); +/// The number of buckets in the timer. As mentioned in the definition of `Timer`, +/// the granularity and capacity need to multiply to be larger than the largest +/// delay that might be used. That's the idle timeout (currently 30s). +const TIMER_CAPACITY: usize = 16384; type StateRef = Rc>; type ConnectionTableRef = Rc>>; @@ -54,21 +61,7 @@ type ConnectionTableRef = Rc>>; pub struct ServerConnectionState { c: Connection, active_attempt: Option, - wake_at: Option, -} - -impl ServerConnectionState { - fn set_wake_at(&mut self, at: Instant) { - self.wake_at = Some(at); - } - - fn needs_waking(&self, now: Instant) -> bool { - self.wake_at.map_or(false, |t| t <= now) - } - - fn woken(&mut self) { - self.wake_at = None; - } + last_timer: Instant, } impl Deref for ServerConnectionState { @@ -181,8 +174,8 @@ pub struct Server { active: HashSet, /// The set of connections that need immediate processing. waiting: VecDeque, - /// The latest [`Output::Callback`] returned from [`Server::process`]. - wake_at: Option, + /// Outstanding timers for connections. + timers: Timer, /// Address validation logic, which determines whether we send a Retry. address_validation: Rc>, /// Directory to create qlog traces in @@ -226,10 +219,10 @@ impl Server { connections: Rc::default(), active: HashSet::default(), waiting: VecDeque::default(), + timers: Timer::new(now, TIMER_GRANULARITY, TIMER_CAPACITY), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, ech_config: None, - wake_at: None, }) } @@ -267,6 +260,11 @@ impl Server { self.ech_config.as_ref().map_or(&[], |cfg| &cfg.encoded) } + fn remove_timer(&mut self, c: &StateRef) { + let last = c.borrow().last_timer; + self.timers.remove(last, |t| Rc::ptr_eq(t, c)); + } + fn process_connection( &mut self, c: &StateRef, @@ -282,12 +280,16 @@ impl Server { } Output::Callback(delay) => { let next = now + delay; - c.borrow_mut().set_wake_at(next); - if self.wake_at.map_or(true, |c| c > next) { - self.wake_at = Some(next); + if next != c.borrow().last_timer { + qtrace!([self], "Change timer to {:?}", next); + self.remove_timer(c); + c.borrow_mut().last_timer = next; + self.timers.add(next, Rc::clone(c)); } } - Output::None => {} + Output::None => { + self.remove_timer(c); + } } if c.borrow().has_events() { qtrace!([self], "Connection active: {:?}", c); @@ -505,7 +507,7 @@ impl Server { self.setup_connection(&mut c, &attempt_key, initial, orig_dcid); let c = Rc::new(RefCell::new(ServerConnectionState { c, - wake_at: None, + last_timer: now, active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); @@ -644,28 +646,24 @@ impl Server { return Some(d); } } - - qtrace!([self], "No packet to send still, check wake up times"); - loop { - let connection = self - .connections - .borrow() - .values() - .find(|c| c.borrow().needs_waking(now)) - .cloned()?; - let datagram = self.process_connection(&connection, None, now); - connection.borrow_mut().woken(); - if datagram.is_some() { - return datagram; + qtrace!([self], "No packet to send still, run timers"); + while let Some(c) = self.timers.take_next(now) { + if let Some(d) = self.process_connection(&c, None, now) { + return Some(d); } } + None } - pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - if self.wake_at.map_or(false, |c| c <= now) { - self.wake_at = None; + fn next_time(&mut self, now: Instant) -> Option { + if self.waiting.is_empty() { + self.timers.next_time().map(|x| x - now) + } else { + Some(Duration::new(0, 0)) } + } + pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { dgram .and_then(|d| self.process_input(d, now)) .or_else(|| self.process_next_output(now)) @@ -673,7 +671,12 @@ impl Server { qtrace!([self], "Send packet: {:?}", d); Output::Datagram(d) }) - .or_else(|| self.wake_at.take().map(|c| Output::Callback(c - now))) + .or_else(|| { + self.next_time(now).map(|delay| { + qtrace!([self], "Wait: {:?}", delay); + Output::Callback(delay) + }) + }) .unwrap_or_else(|| { qtrace!([self], "Go dormant"); Output::None