From eb4925084952ff93f4b3d7552906117be98d4343 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 17 Jun 2024 16:46:04 +0200 Subject: [PATCH 01/13] refactor(transport/server): always iterate each connection Say a `neqo_transport::Server` is managing a single `neqo_transport::Connection` in `Server::connections`. Assume the following chain of events: 1. A user (e.g. `neqo-http3`) calls `Server::process`. ``` rust pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { if self.wake_at.map_or(false, |c| c <= now) { self.wake_at = None; } dgram .and_then(|d| self.process_input(d, now)) .or_else(|| self.process_next_output(now)) .map(|d| { qtrace!([self], "Send packet: {:?}", d); Output::Datagram(d) }) .or_else(|| self.wake_at.take().map(|c| Output::Callback(c - now))) .unwrap_or_else(|| { qtrace!([self], "Go dormant"); Output::None }) } ``` https://github.com/mozilla/neqo/blob/6664452e2ba25f028ebf07c404fff3d0193c0ef4/neqo-transport/src/server.rs#L660-L677 2. `self.wake_at` is `None`. 3. `dgram` is `None`, thus `self.process_input` is never called. 4. `self.process_next_output(now)` is called. ``` rust fn process_next_output(&mut self, now: Instant) -> Option { qtrace!([self], "No packet to send, look at waiting connections"); while let Some(c) = self.waiting.pop_front() { if let Some(d) = self.process_connection(&c, None, now) { return Some(d); } } ``` https://github.com/mozilla/neqo/blob/6664452e2ba25f028ebf07c404fff3d0193c0ef4/neqo-transport/src/server.rs#L636-L642 1. It attains a reference to the one `Connection` through `self.waiting.pop_front()`. 2. It calls self.process_connection which in turn calls `Connection::process`, returning a `Output::Callback(_)`, which is stored in `self.wake_at`. 5. `self.wake_at.take()` takes the callback and returns it to the user as `Output::Callback`. 6. The user calls `Server::process` again. 1. `self.wake_at` is `None`. 2. `dgram` is `None`, thus `self.process_input` isn't called. 3. `Server::process` calls `Server::process_next_output`. 1. `Server::process_next_output` finds no connection reference in `self.waiting` and thus returns `None`. 4. `self.wake_at.take()` is `None` 5. `Server::process` returns `None` Result is that the user received an `Output::None` even though the one `Connection` managed by the `Server` is waiting for a callback. Thus the server stalls. The single source of truth of whether a `Connection` needs a callback or not is the `Connection` itself. Instead of duplicating this information in `Server::wake_at`, `Server::waiting`, `ServerConnectionState::wake_at`, always ask the single source of truth, i.e. the `Connection`. More concretely, with this patch `Server::process` always calls `Connection::process` for each of its `Connection`s. It does not try to be smart on whether a `Connection` needs `process`ing or not. --- neqo-transport/src/connection/mod.rs | 11 +++ neqo-transport/src/server.rs | 138 ++++++++------------------- 2 files changed, 52 insertions(+), 97 deletions(-) diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 77c39abff3..b4329c349f 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -136,6 +136,17 @@ impl Output { _ => Duration::new(0, 0), } } + + #[must_use] + pub fn or_else(self, f: F) -> Self + where + F: FnOnce() -> Self, + { + match self { + x @ Self::Datagram(_) | x @ Self::Callback(_) => x, + Self::None => f(), + } + } } /// Used by inner functions like `Connection::output`. diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index eabb10e25b..27c743efab 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -8,7 +8,7 @@ use std::{ cell::RefCell, - collections::{HashMap, HashSet, VecDeque}, + collections::{HashMap, HashSet}, fs::OpenOptions, mem, net::SocketAddr, @@ -50,21 +50,6 @@ type ConnectionTableRef = Rc>>; pub struct ServerConnectionState { c: Connection, active_attempt: Option, - wake_at: Option, -} - -impl ServerConnectionState { - fn set_wake_at(&mut self, at: Instant) { - self.wake_at = Some(at); - } - - fn needs_waking(&self, now: Instant) -> bool { - self.wake_at.map_or(false, |t| t <= now) - } - - fn woken(&mut self) { - self.wake_at = None; - } } impl Deref for ServerConnectionState { @@ -175,10 +160,6 @@ pub struct Server { connections: ConnectionTableRef, /// The connections that have new events. active: HashSet, - /// The set of connections that need immediate processing. - waiting: VecDeque, - /// The latest [`Output::Callback`] returned from [`Server::process`]. - wake_at: Option, /// Address validation logic, which determines whether we send a Retry. address_validation: Rc>, /// Directory to create qlog traces in @@ -221,11 +202,9 @@ impl Server { active_attempts: HashMap::default(), connections: Rc::default(), active: HashSet::default(), - waiting: VecDeque::default(), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, ech_config: None, - wake_at: None, }) } @@ -268,23 +247,9 @@ impl Server { c: &StateRef, dgram: Option<&Datagram>, now: Instant, - ) -> Option { + ) -> Output { qtrace!([self], "Process connection {:?}", c); let out = c.borrow_mut().process(dgram, now); - match out { - Output::Datagram(_) => { - qtrace!([self], "Sending packet, added to waiting connections"); - self.waiting.push_back(Rc::clone(c)); - } - Output::Callback(delay) => { - let next = now + delay; - c.borrow_mut().set_wake_at(next); - if self.wake_at.map_or(true, |c| c > next) { - self.wake_at = Some(next); - } - } - Output::None => {} - } if c.borrow().has_events() { qtrace!([self], "Connection active: {:?}", c); self.active.insert(ActiveConnectionRef { c: Rc::clone(c) }); @@ -303,7 +268,7 @@ impl Server { .borrow_mut() .retain(|_, v| !Rc::ptr_eq(v, c)); } - out.dgram() + out } fn connection(&self, cid: ConnectionIdRef) -> Option { @@ -315,14 +280,14 @@ impl Server { initial: InitialDetails, dgram: &Datagram, now: Instant, - ) -> Option { + ) -> Output { qdebug!([self], "Handle initial"); let res = self .address_validation .borrow() .validate(&initial.token, dgram.source(), now); match res { - AddressValidationResult::Invalid => None, + AddressValidationResult::Invalid => Output::None, AddressValidationResult::Pass => self.connection_attempt(initial, dgram, None, now), AddressValidationResult::ValidRetry(orig_dcid) => { self.connection_attempt(initial, dgram, Some(orig_dcid), now) @@ -337,7 +302,7 @@ impl Server { ); let Ok(token) = res else { qerror!([self], "unable to generate token, dropping packet"); - return None; + return Output::None; }; if let Some(new_dcid) = self.cid_generator.borrow_mut().generate_cid() { let packet = PacketBuilder::retry( @@ -355,14 +320,14 @@ impl Server { dgram.ttl(), p, ); - Some(retry) + Output::Datagram(retry) } else { qerror!([self], "unable to encode retry, dropping packet"); - None + Output::None } } else { qerror!([self], "no connection ID for retry, dropping packet"); - None + Output::None } } } @@ -374,7 +339,7 @@ impl Server { dgram: &Datagram, orig_dcid: Option, now: Instant, - ) -> Option { + ) -> Output { let attempt_key = AttemptKey { remote_address: dgram.source(), odcid: orig_dcid.as_ref().unwrap_or(&initial.dst_cid).clone(), @@ -475,7 +440,7 @@ impl Server { dgram: &Datagram, orig_dcid: Option, now: Instant, - ) -> Option { + ) -> Output { qinfo!([self], "Accept connection {:?}", attempt_key); // The internal connection ID manager that we use is not used directly. // Instead, wrap it so that we can save connection IDs. @@ -501,7 +466,6 @@ impl Server { self.setup_connection(&mut c, &attempt_key, initial, orig_dcid); let c = Rc::new(RefCell::new(ServerConnectionState { c, - wake_at: None, active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); @@ -518,7 +482,7 @@ impl Server { initial.version.wire_version(), ); } - None + Output::None } } } @@ -526,12 +490,7 @@ impl Server { /// Handle 0-RTT packets that were sent with the client's choice of connection ID. /// Most 0-RTT will arrive this way. A client can usually send 1-RTT after it /// receives a connection ID from the server. - fn handle_0rtt( - &mut self, - dgram: &Datagram, - dcid: ConnectionId, - now: Instant, - ) -> Option { + fn handle_0rtt(&mut self, dgram: &Datagram, dcid: ConnectionId, now: Instant) -> Output { let attempt_key = AttemptKey { remote_address: dgram.source(), odcid: dcid, @@ -546,11 +505,11 @@ impl Server { self.process_connection(&c, Some(dgram), now) } else { qdebug!([self], "Dropping 0-RTT for unknown connection"); - None + Output::None } } - fn process_input(&mut self, dgram: &Datagram, now: Instant) -> Option { + fn process_input(&mut self, dgram: &Datagram, now: Instant) -> Output { qtrace!("Process datagram: {}", hex(&dgram[..])); // This is only looking at the first packet header in the datagram. @@ -558,7 +517,7 @@ impl Server { let res = PublicPacket::decode(&dgram[..], self.cid_generator.borrow().as_decoder()); let Ok((packet, _remainder)) = res else { qtrace!([self], "Discarding {:?}", dgram); - return None; + return Output::None; }; // Finding an existing connection. Should be the most common case. @@ -569,7 +528,7 @@ impl Server { if packet.packet_type() == PacketType::Short { // TODO send a stateless reset here. qtrace!([self], "Short header packet for an unknown connection"); - return None; + return Output::None; } if packet.packet_type() == PacketType::OtherVersion @@ -582,7 +541,7 @@ impl Server { { if dgram.len() < MIN_INITIAL_PACKET_SIZE { qdebug!([self], "Unsupported version: too short"); - return None; + return Output::None; } qdebug!([self], "Unsupported version: {:x}", packet.wire_version()); @@ -599,7 +558,7 @@ impl Server { packet.wire_version(), ); - return Some(Datagram::new( + return Output::Datagram(Datagram::new( dgram.destination(), dgram.source(), dgram.tos(), @@ -612,7 +571,7 @@ impl Server { PacketType::Initial => { if dgram.len() < MIN_INITIAL_PACKET_SIZE { qdebug!([self], "Drop initial: too short"); - return None; + return Output::None; } // Copy values from `packet` because they are currently still borrowing from // `dgram`. @@ -626,54 +585,39 @@ impl Server { PacketType::OtherVersion => unreachable!(), _ => { qtrace!([self], "Not an initial packet"); - None + Output::None } } } /// Iterate through the pending connections looking for any that might want /// to send a datagram. Stop at the first one that does. - fn process_next_output(&mut self, now: Instant) -> Option { - qtrace!([self], "No packet to send, look at waiting connections"); - while let Some(c) = self.waiting.pop_front() { - if let Some(d) = self.process_connection(&c, None, now) { - return Some(d); + fn process_next_output(&mut self, now: Instant) -> Output { + // TODO: Remove allocation. + let connections: Vec<_> = self.connections.borrow().values().cloned().collect(); + + let mut callback = None; + + for connection in connections { + match self.process_connection(&connection, None, now) { + Output::None => {} + d @ Output::Datagram(_) => return d, + // TODO: Refactor + Output::Callback(new_callback) => match callback { + Some(previous_callback) if previous_callback < new_callback => {} + _ => callback = Some(new_callback), + }, } } - qtrace!([self], "No packet to send still, check wake up times"); - loop { - let connection = self - .connections - .borrow() - .values() - .find(|c| c.borrow().needs_waking(now)) - .cloned()?; - let datagram = self.process_connection(&connection, None, now); - connection.borrow_mut().woken(); - if datagram.is_some() { - return datagram; - } - } + callback.map(Output::Callback).unwrap_or(Output::None) } pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - if self.wake_at.map_or(false, |c| c <= now) { - self.wake_at = None; - } - dgram - .and_then(|d| self.process_input(d, now)) + .map(|d| self.process_input(d, now)) + .unwrap_or(Output::None) .or_else(|| self.process_next_output(now)) - .map(|d| { - qtrace!([self], "Send packet: {:?}", d); - Output::Datagram(d) - }) - .or_else(|| self.wake_at.take().map(|c| Output::Callback(c - now))) - .unwrap_or_else(|| { - qtrace!([self], "Go dormant"); - Output::None - }) } /// This lists the connections that have received new events @@ -689,8 +633,8 @@ impl Server { !self.active.is_empty() } - pub fn add_to_waiting(&mut self, c: &ActiveConnectionRef) { - self.waiting.push_back(c.connection()); + pub fn add_to_waiting(&mut self, _c: &ActiveConnectionRef) { + // TODO: We always iterate all now. } } From bbb5cfeff45aa53b0b4c5a68e51b92b1f8d794fd Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 26 Jun 2024 14:22:10 +0200 Subject: [PATCH 02/13] Don't duplicate active state --- neqo-transport/src/server.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 27c743efab..2003fddec8 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -8,9 +8,8 @@ use std::{ cell::RefCell, - collections::{HashMap, HashSet}, + collections::HashMap, fs::OpenOptions, - mem, net::SocketAddr, ops::{Deref, DerefMut}, path::PathBuf, @@ -158,8 +157,6 @@ pub struct Server { active_attempts: HashMap, /// All connections, keyed by `ConnectionId`. connections: ConnectionTableRef, - /// The connections that have new events. - active: HashSet, /// Address validation logic, which determines whether we send a Retry. address_validation: Rc>, /// Directory to create qlog traces in @@ -201,7 +198,6 @@ impl Server { conn_params, active_attempts: HashMap::default(), connections: Rc::default(), - active: HashSet::default(), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, ech_config: None, @@ -250,10 +246,6 @@ impl Server { ) -> Output { qtrace!([self], "Process connection {:?}", c); let out = c.borrow_mut().process(dgram, now); - if c.borrow().has_events() { - qtrace!([self], "Connection active: {:?}", c); - self.active.insert(ActiveConnectionRef { c: Rc::clone(c) }); - } if *c.borrow().state() > State::Handshaking { // Remove any active connection attempt now that this is no longer handshaking. @@ -622,15 +614,28 @@ impl Server { /// This lists the connections that have received new events /// as a result of calling `process()`. + // TODO: Why is this not an Iterator? pub fn active_connections(&mut self) -> Vec { - mem::take(&mut self.active).into_iter().collect() + self.connections + .borrow() + .values() + .filter_map(|c| { + c.borrow() + .has_events() + .then(|| ActiveConnectionRef { c: Rc::clone(c) }) + }) + .collect() } /// Whether any connections have received new events as a result of calling /// `process()`. + // TODO: Improve? #[must_use] pub fn has_active_connections(&self) -> bool { - !self.active.is_empty() + self.connections + .borrow() + .values() + .any(|c| c.borrow().has_events()) } pub fn add_to_waiting(&mut self, _c: &ActiveConnectionRef) { From dc8f14c8aa049f717847da0a782e84eb2c093317 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 26 Jun 2024 14:32:43 +0200 Subject: [PATCH 03/13] Don't duplicate attempt state --- neqo-transport/src/server.rs | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 2003fddec8..0df290b893 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -151,10 +151,6 @@ pub struct Server { cid_generator: Rc>, /// Connection parameters. conn_params: ConnectionParameters, - /// Active connection attempts, keyed by `AttemptKey`. Initial packets with - /// the same key are routed to the connection that was first accepted. - /// This is cleared out when the connection is closed or established. - active_attempts: HashMap, /// All connections, keyed by `ConnectionId`. connections: ConnectionTableRef, /// Address validation logic, which determines whether we send a Retry. @@ -196,7 +192,6 @@ impl Server { zero_rtt_checker: ServerZeroRttChecker::new(zero_rtt_checker), cid_generator, conn_params, - active_attempts: HashMap::default(), connections: Rc::default(), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, @@ -249,9 +244,7 @@ impl Server { if *c.borrow().state() > State::Handshaking { // Remove any active connection attempt now that this is no longer handshaking. - if let Some(k) = c.borrow_mut().active_attempt.take() { - self.active_attempts.remove(&k); - } + c.borrow_mut().active_attempt.take(); } if matches!(c.borrow().state(), State::Closed(_)) { @@ -336,13 +329,15 @@ impl Server { remote_address: dgram.source(), odcid: orig_dcid.as_ref().unwrap_or(&initial.dst_cid).clone(), }; - if let Some(c) = self.active_attempts.get(&attempt_key) { + let connection = self.connections.borrow().values().find_map(|c| { + (c.borrow().active_attempt.as_ref() == Some(&attempt_key)).then(|| Rc::clone(c)) + }); + if let Some(c) = connection { qdebug!( [self], "Handle Initial for existing connection attempt {:?}", attempt_key ); - let c = Rc::clone(c); self.process_connection(&c, Some(dgram), now) } else { self.accept_connection(attempt_key, initial, dgram, orig_dcid, now) @@ -461,8 +456,6 @@ impl Server { active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); - let previous_attempt = self.active_attempts.insert(attempt_key, Rc::clone(&c)); - debug_assert!(previous_attempt.is_none()); self.process_connection(&c, Some(dgram), now) } Err(e) => { @@ -487,13 +480,15 @@ impl Server { remote_address: dgram.source(), odcid: dcid, }; - if let Some(c) = self.active_attempts.get(&attempt_key) { + let connection = self.connections.borrow().values().find_map(|c| { + (c.borrow().active_attempt.as_ref() == Some(&attempt_key)).then(|| Rc::clone(c)) + }); + if let Some(c) = connection { qdebug!( [self], "Handle 0-RTT for existing connection attempt {:?}", attempt_key ); - let c = Rc::clone(c); self.process_connection(&c, Some(dgram), now) } else { qdebug!([self], "Dropping 0-RTT for unknown connection"); From 98b6f8292d32a2661e1b0ef0804fc92337fa869a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 13:11:09 +0200 Subject: [PATCH 04/13] Simplify process_connection call --- neqo-transport/src/server.rs | 73 +++++++++++++++++++----------------- 1 file changed, 39 insertions(+), 34 deletions(-) diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 0df290b893..6fc2006a3e 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -51,6 +51,21 @@ pub struct ServerConnectionState { active_attempt: Option, } +impl ServerConnectionState { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + qtrace!("Process connection {:?}", self.c); + let out = self.c.process(dgram, now); + + if *self.c.state() > State::Handshaking { + // Remove any active connection attempt now that this is no longer handshaking. + self.active_attempt.take(); + } + + out + } +} + +// TODO: Needed? impl Deref for ServerConnectionState { type Target = Connection; fn deref(&self) -> &Self::Target { @@ -58,6 +73,7 @@ impl Deref for ServerConnectionState { } } +// TODO: Needed? impl DerefMut for ServerConnectionState { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.c @@ -233,29 +249,6 @@ impl Server { self.ech_config.as_ref().map_or(&[], |cfg| &cfg.encoded) } - fn process_connection( - &mut self, - c: &StateRef, - dgram: Option<&Datagram>, - now: Instant, - ) -> Output { - qtrace!([self], "Process connection {:?}", c); - let out = c.borrow_mut().process(dgram, now); - - if *c.borrow().state() > State::Handshaking { - // Remove any active connection attempt now that this is no longer handshaking. - c.borrow_mut().active_attempt.take(); - } - - if matches!(c.borrow().state(), State::Closed(_)) { - c.borrow_mut().set_qlog(NeqoQlog::disabled()); - self.connections - .borrow_mut() - .retain(|_, v| !Rc::ptr_eq(v, c)); - } - out - } - fn connection(&self, cid: ConnectionIdRef) -> Option { self.connections.borrow().get(&cid[..]).cloned() } @@ -338,7 +331,7 @@ impl Server { "Handle Initial for existing connection attempt {:?}", attempt_key ); - self.process_connection(&c, Some(dgram), now) + c.borrow_mut().process(Some(dgram), now) } else { self.accept_connection(attempt_key, initial, dgram, orig_dcid, now) } @@ -456,7 +449,9 @@ impl Server { active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); - self.process_connection(&c, Some(dgram), now) + // TODO: Indirection with `out` still needed? + let out = c.borrow_mut().process(Some(dgram), now); + out } Err(e) => { qwarn!([self], "Unable to create connection"); @@ -489,7 +484,7 @@ impl Server { "Handle 0-RTT for existing connection attempt {:?}", attempt_key ); - self.process_connection(&c, Some(dgram), now) + c.borrow_mut().process(Some(dgram), now) } else { qdebug!([self], "Dropping 0-RTT for unknown connection"); Output::None @@ -509,7 +504,7 @@ impl Server { // Finding an existing connection. Should be the most common case. if let Some(c) = self.connection(packet.dcid()) { - return self.process_connection(&c, Some(dgram), now); + return c.borrow_mut().process(Some(dgram), now); } if packet.packet_type() == PacketType::Short { @@ -580,13 +575,10 @@ impl Server { /// Iterate through the pending connections looking for any that might want /// to send a datagram. Stop at the first one that does. fn process_next_output(&mut self, now: Instant) -> Output { - // TODO: Remove allocation. - let connections: Vec<_> = self.connections.borrow().values().cloned().collect(); - let mut callback = None; - for connection in connections { - match self.process_connection(&connection, None, now) { + for connection in self.connections.borrow().values() { + match connection.borrow_mut().process(None, now) { Output::None => {} d @ Output::Datagram(_) => return d, // TODO: Refactor @@ -601,10 +593,23 @@ impl Server { } pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - dgram + let out = dgram .map(|d| self.process_input(d, now)) .unwrap_or(Output::None) - .or_else(|| self.process_next_output(now)) + .or_else(|| self.process_next_output(now)); + + // Clean-up closed connections. + self.connections.borrow_mut().retain(|_, c| { + if matches!(c.borrow().state(), State::Closed(_)) { + // TODO: Is this still needed? + c.borrow_mut().set_qlog(NeqoQlog::disabled()); + false + } else { + true + } + }); + + out } /// This lists the connections that have received new events From c172b1542c99970d4bd7e51fdffd9637588e549d Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 13:48:05 +0200 Subject: [PATCH 05/13] Deduplicate active connections --- neqo-transport/src/server.rs | 10 +++++++--- neqo-transport/tests/common/mod.rs | 3 ++- neqo-transport/tests/server.rs | 2 +- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 6fc2006a3e..839f57977f 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -8,7 +8,7 @@ use std::{ cell::RefCell, - collections::HashMap, + collections::{HashMap, HashSet}, fs::OpenOptions, net::SocketAddr, ops::{Deref, DerefMut}, @@ -616,7 +616,8 @@ impl Server { /// as a result of calling `process()`. // TODO: Why is this not an Iterator? pub fn active_connections(&mut self) -> Vec { - self.connections + let conns: HashSet<_> = self + .connections .borrow() .values() .filter_map(|c| { @@ -624,7 +625,10 @@ impl Server { .has_events() .then(|| ActiveConnectionRef { c: Rc::clone(c) }) }) - .collect() + .collect(); + + // TODO: Do better deduplication. + conns.into_iter().collect() } /// Whether any connections have received new events as a result of calling diff --git a/neqo-transport/tests/common/mod.rs b/neqo-transport/tests/common/mod.rs index ecbbe1c3ce..35daecddfa 100644 --- a/neqo-transport/tests/common/mod.rs +++ b/neqo-transport/tests/common/mod.rs @@ -48,7 +48,8 @@ pub fn connected_server(server: &mut Server) -> ActiveConnectionRef { .iter() .filter(|c: &&ActiveConnectionRef| *c.borrow().state() == State::Confirmed); let c = confirmed.next().expect("one confirmed"); - assert!(confirmed.next().is_none(), "only one confirmed"); + // TODO + // assert!(confirmed.next().is_none(), "only one confirmed"); c.clone() } diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index 36a49a48cf..3cd6fb12c5 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -261,7 +261,7 @@ fn drop_short_initial() { } /// Verify that the server can read 0-RTT properly. A more robust server would buffer -/// 0-RTT before the handshake begins and let 0-RTT arrive for a short periiod after +/// 0-RTT before the handshake begins and let 0-RTT arrive for a short period after /// the handshake completes, but ours is for testing so it only allows 0-RTT while /// the handshake is running. #[test] From fa4e930a0ac4b9ff62c7ff48d62cf19933c68647 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 14:57:33 +0200 Subject: [PATCH 06/13] Attempt fix for zerortt --- neqo-transport/tests/server.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index a3e2a8c30f..076fa7c803 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -273,7 +273,8 @@ fn zero_rtt() { let t = server.process(None, now).callback(); now += t; assert_eq!(server.process(None, now), Output::None); - assert_eq!(server.active_connections().len(), 1); + // TODO: Why was this 1? + assert_eq!(server.active_connections().len(), 0); let start_time = now; let mut client = default_client(); From 4428fb80e960c044c66e1d6b0c5dd739b45bfae5 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 14:57:49 +0200 Subject: [PATCH 07/13] clippy --- neqo-transport/src/connection/mod.rs | 2 +- neqo-transport/src/server.rs | 18 +++++++----------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 5e3a6f9d29..3aa6ce8304 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -143,7 +143,7 @@ impl Output { F: FnOnce() -> Self, { match self { - x @ Self::Datagram(_) | x @ Self::Callback(_) => x, + x @ (Self::Datagram(_) | Self::Callback(_)) => x, Self::None => f(), } } diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 869e14ca02..766bede3e4 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -331,7 +331,7 @@ impl Server { ); c.borrow_mut().process(Some(dgram), now) } else { - self.accept_connection(attempt_key, initial, dgram, orig_dcid, now) + self.accept_connection(&attempt_key, initial, dgram, orig_dcid, now) } } @@ -413,7 +413,7 @@ impl Server { fn accept_connection( &mut self, - attempt_key: AttemptKey, + attempt_key: &AttemptKey, initial: InitialDetails, dgram: &Datagram, orig_dcid: Option, @@ -441,7 +441,7 @@ impl Server { match sconn { Ok(mut c) => { - self.setup_connection(&mut c, &attempt_key, initial, orig_dcid); + self.setup_connection(&mut c, attempt_key, initial, orig_dcid); let c = Rc::new(RefCell::new(ServerConnectionState { c, active_attempt: Some(attempt_key.clone()), @@ -586,13 +586,12 @@ impl Server { } } - callback.map(Output::Callback).unwrap_or(Output::None) + callback.map_or(Output::None, Output::Callback) } pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { let out = dgram - .map(|d| self.process_input(d, now)) - .unwrap_or(Output::None) + .map_or(Output::None, |d| self.process_input(d, now)) .or_else(|| self.process_next_output(now)); // Clean-up closed connections. @@ -617,11 +616,8 @@ impl Server { .connections .borrow() .values() - .filter_map(|c| { - c.borrow() - .has_events() - .then(|| ActiveConnectionRef { c: Rc::clone(c) }) - }) + .filter(|c| c.borrow().has_events()) + .map(|c| ActiveConnectionRef { c: Rc::clone(c) }) .collect(); // TODO: Do better deduplication. From d92017a388bb944412ec924db81d48444629e1c1 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 15:12:10 +0200 Subject: [PATCH 08/13] Attempt fix for same_initial_after_connected --- neqo-transport/tests/server.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index 076fa7c803..49397e15c9 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -210,15 +210,14 @@ fn same_initial_after_connected() { let server_initial = server.process(client_initial.as_dgram_ref(), now()).dgram(); assert!(server_initial.is_some()); complete_connection(&mut client, &mut server, server_initial); - // This removes the connection from the active set until something happens to it. - assert_eq!(server.active_connections().len(), 0); + assert_eq!(server.active_connections().len(), 1); // Now make a new connection using the exact same initial as before. // The server should respond to an attempt to connect with the same Initial. let dgram = server.process(client_initial.as_dgram_ref(), now()).dgram(); assert!(dgram.is_some()); // The server should make a new connection object. - assert_eq!(server.active_connections().len(), 1); + assert_eq!(server.active_connections().len(), 2); } #[test] From 81a874d7edab897af785e3c5067c9c0188f85000 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 16:15:21 +0200 Subject: [PATCH 09/13] Address minor TODOs --- neqo-transport/src/server.rs | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 766bede3e4..6f5ca08eea 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -8,6 +8,7 @@ use std::{ cell::RefCell, + cmp::min, collections::{HashMap, HashSet}, fs::OpenOptions, net::SocketAddr, @@ -65,7 +66,6 @@ impl ServerConnectionState { } } -// TODO: Needed? impl Deref for ServerConnectionState { type Target = Connection; fn deref(&self) -> &Self::Target { @@ -73,7 +73,6 @@ impl Deref for ServerConnectionState { } } -// TODO: Needed? impl DerefMut for ServerConnectionState { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.c @@ -447,9 +446,7 @@ impl Server { active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); - // TODO: Indirection with `out` still needed? - let out = c.borrow_mut().process(Some(dgram), now); - out + return c.borrow_mut().process(Some(dgram), now); } Err(e) => { qwarn!([self], "Unable to create connection"); @@ -578,10 +575,9 @@ impl Server { match connection.borrow_mut().process(None, now) { Output::None => {} d @ Output::Datagram(_) => return d, - // TODO: Refactor - Output::Callback(new_callback) => match callback { - Some(previous_callback) if previous_callback < new_callback => {} - _ => callback = Some(new_callback), + Output::Callback(new) => match callback { + Some(previous) => callback = Some(min(previous, new)), + None => callback = Some(new), }, } } @@ -626,7 +622,6 @@ impl Server { /// Whether any connections have received new events as a result of calling /// `process()`. - // TODO: Improve? #[must_use] pub fn has_active_connections(&self) -> bool { self.connections From 77460eb1cc95677af324bc7e399b3b74a2d8733a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 16:15:40 +0200 Subject: [PATCH 10/13] Remove Server::add_to_waiting It is no longer needed as each connection is always processed. --- neqo-bin/src/server/http09.rs | 1 - neqo-http3/src/server.rs | 27 +++++++-------------------- neqo-transport/src/server.rs | 21 ++++++--------------- neqo-transport/tests/server.rs | 24 ++++++++++++++++++++++-- 4 files changed, 35 insertions(+), 38 deletions(-) diff --git a/neqo-bin/src/server/http09.rs b/neqo-bin/src/server/http09.rs index 5ccc6e7496..efc0bf49fb 100644 --- a/neqo-bin/src/server/http09.rs +++ b/neqo-bin/src/server/http09.rs @@ -190,7 +190,6 @@ impl HttpServer { .unwrap(); qdebug!("Wrote {}", sent); *offset += sent; - self.server.add_to_waiting(conn); if *offset == data.len() { qinfo!("Sent {sent} on {stream_id}, closing"); conn.borrow_mut().stream_close_send(stream_id).unwrap(); diff --git a/neqo-http3/src/server.rs b/neqo-http3/src/server.rs index e5fb5e1ae5..4cc31b597a 100644 --- a/neqo-http3/src/server.rs +++ b/neqo-http3/src/server.rs @@ -131,27 +131,14 @@ impl Http3Server { fn process_http3(&mut self, now: Instant) { qtrace!([self], "Process http3 internal."); let mut active_conns = self.server.active_connections(); + active_conns.extend( + self.http3_handlers + .iter() + .filter(|(_, handler)| handler.borrow_mut().should_be_processed()) + .map(|(c, _)| c) + .cloned(), + ); - // We need to find connections that needs to be process on http3 level. - let mut http3_active: Vec = self - .http3_handlers - .iter() - .filter_map(|(conn, handler)| { - if handler.borrow_mut().should_be_processed() && !active_conns.contains(conn) { - Some(conn) - } else { - None - } - }) - .cloned() - .collect(); - // For http_active connection we need to put them in neqo-transport's server - // waiting queue. - active_conns.append(&mut http3_active); - active_conns.dedup(); - active_conns - .iter() - .for_each(|conn| self.server.add_to_waiting(conn)); for mut conn in active_conns { self.process_events(&mut conn, now); } diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 6f5ca08eea..3c762c89bd 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -575,9 +575,9 @@ impl Server { match connection.borrow_mut().process(None, now) { Output::None => {} d @ Output::Datagram(_) => return d, - Output::Callback(new) => match callback { - Some(previous) => callback = Some(min(previous, new)), - None => callback = Some(new), + Output::Callback(next) => match callback { + Some(previous) => callback = Some(min(previous, next)), + None => callback = Some(next), }, } } @@ -606,18 +606,13 @@ impl Server { /// This lists the connections that have received new events /// as a result of calling `process()`. - // TODO: Why is this not an Iterator? - pub fn active_connections(&mut self) -> Vec { - let conns: HashSet<_> = self - .connections + pub fn active_connections(&mut self) -> HashSet { + self.connections .borrow() .values() .filter(|c| c.borrow().has_events()) .map(|c| ActiveConnectionRef { c: Rc::clone(c) }) - .collect(); - - // TODO: Do better deduplication. - conns.into_iter().collect() + .collect() } /// Whether any connections have received new events as a result of calling @@ -629,10 +624,6 @@ impl Server { .values() .any(|c| c.borrow().has_events()) } - - pub fn add_to_waiting(&mut self, _c: &ActiveConnectionRef) { - // TODO: We always iterate all now. - } } #[derive(Clone, Debug)] diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index 49397e15c9..9c1d54ff25 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -310,7 +310,17 @@ fn zero_rtt() { // The server will have received two STREAM frames now if it processed both packets. let active = server.active_connections(); assert_eq!(active.len(), 1); - assert_eq!(active[0].borrow().stats().frame_rx.stream, 2); + assert_eq!( + active + .iter() + .next() + .unwrap() + .borrow() + .stats() + .frame_rx + .stream, + 2 + ); // Complete the handshake. As the client was pacing 0-RTT packets, extend the time // a little so that the pacer doesn't prevent the Finished from being sent. @@ -322,7 +332,17 @@ fn zero_rtt() { mem::drop(server.process(Some(&c4), now)); let active = server.active_connections(); assert_eq!(active.len(), 1); - assert_eq!(active[0].borrow().stats().frame_rx.stream, 2); + assert_eq!( + active + .iter() + .next() + .unwrap() + .borrow() + .stats() + .frame_rx + .stream, + 2 + ); } #[test] From 9be8df21edbe8257786f4fd5f66aa35befd6ccbc Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 17:06:01 +0200 Subject: [PATCH 11/13] Allow connected_server with more than one connection --- neqo-transport/tests/common/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/neqo-transport/tests/common/mod.rs b/neqo-transport/tests/common/mod.rs index 35daecddfa..2e28fc3070 100644 --- a/neqo-transport/tests/common/mod.rs +++ b/neqo-transport/tests/common/mod.rs @@ -48,8 +48,6 @@ pub fn connected_server(server: &mut Server) -> ActiveConnectionRef { .iter() .filter(|c: &&ActiveConnectionRef| *c.borrow().state() == State::Confirmed); let c = confirmed.next().expect("one confirmed"); - // TODO - // assert!(confirmed.next().is_none(), "only one confirmed"); c.clone() } From a50e1de63f2fc41b8587531b5f03d4ac1a0e1047 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 1 Jul 2024 17:18:41 +0200 Subject: [PATCH 12/13] Remove unnecessary TODO --- neqo-transport/tests/server.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index 9c1d54ff25..7b4ceccf6c 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -272,7 +272,6 @@ fn zero_rtt() { let t = server.process(None, now).callback(); now += t; assert_eq!(server.process(None, now), Output::None); - // TODO: Why was this 1? assert_eq!(server.active_connections().len(), 0); let start_time = now; From 76227d3f78dfa2fe62c59605c99b8ed3ffb7ecdb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 2 Jul 2024 09:03:21 +0200 Subject: [PATCH 13/13] Simplify closed connection clean up --- neqo-transport/src/server.rs | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 3c762c89bd..71c90f5e04 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -591,15 +591,9 @@ impl Server { .or_else(|| self.process_next_output(now)); // Clean-up closed connections. - self.connections.borrow_mut().retain(|_, c| { - if matches!(c.borrow().state(), State::Closed(_)) { - // TODO: Is this still needed? - c.borrow_mut().set_qlog(NeqoQlog::disabled()); - false - } else { - true - } - }); + self.connections + .borrow_mut() + .retain(|_, c| !matches!(c.borrow().state(), State::Closed(_))); out }