diff --git a/neqo-bin/src/server/http09.rs b/neqo-bin/src/server/http09.rs index 5ccc6e7496..efc0bf49fb 100644 --- a/neqo-bin/src/server/http09.rs +++ b/neqo-bin/src/server/http09.rs @@ -190,7 +190,6 @@ impl HttpServer { .unwrap(); qdebug!("Wrote {}", sent); *offset += sent; - self.server.add_to_waiting(conn); if *offset == data.len() { qinfo!("Sent {sent} on {stream_id}, closing"); conn.borrow_mut().stream_close_send(stream_id).unwrap(); diff --git a/neqo-http3/src/server.rs b/neqo-http3/src/server.rs index e5fb5e1ae5..4cc31b597a 100644 --- a/neqo-http3/src/server.rs +++ b/neqo-http3/src/server.rs @@ -131,27 +131,14 @@ impl Http3Server { fn process_http3(&mut self, now: Instant) { qtrace!([self], "Process http3 internal."); let mut active_conns = self.server.active_connections(); + active_conns.extend( + self.http3_handlers + .iter() + .filter(|(_, handler)| handler.borrow_mut().should_be_processed()) + .map(|(c, _)| c) + .cloned(), + ); - // We need to find connections that needs to be process on http3 level. - let mut http3_active: Vec = self - .http3_handlers - .iter() - .filter_map(|(conn, handler)| { - if handler.borrow_mut().should_be_processed() && !active_conns.contains(conn) { - Some(conn) - } else { - None - } - }) - .cloned() - .collect(); - // For http_active connection we need to put them in neqo-transport's server - // waiting queue. - active_conns.append(&mut http3_active); - active_conns.dedup(); - active_conns - .iter() - .for_each(|conn| self.server.add_to_waiting(conn)); for mut conn in active_conns { self.process_events(&mut conn, now); } diff --git a/neqo-transport/src/connection/mod.rs b/neqo-transport/src/connection/mod.rs index 0a49bfae77..3aa6ce8304 100644 --- a/neqo-transport/src/connection/mod.rs +++ b/neqo-transport/src/connection/mod.rs @@ -136,6 +136,17 @@ impl Output { _ => Duration::new(0, 0), } } + + #[must_use] + pub fn or_else(self, f: F) -> Self + where + F: FnOnce() -> Self, + { + match self { + x @ (Self::Datagram(_) | Self::Callback(_)) => x, + Self::None => f(), + } + } } /// Used by inner functions like `Connection::output`. diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 1489b8ebff..71c90f5e04 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -8,9 +8,9 @@ use std::{ cell::RefCell, - collections::{HashMap, HashSet, VecDeque}, + cmp::min, + collections::{HashMap, HashSet}, fs::OpenOptions, - mem, net::SocketAddr, ops::{Deref, DerefMut}, path::PathBuf, @@ -50,20 +50,19 @@ type ConnectionTableRef = Rc>>; pub struct ServerConnectionState { c: Connection, active_attempt: Option, - wake_at: Option, } impl ServerConnectionState { - fn set_wake_at(&mut self, at: Instant) { - self.wake_at = Some(at); - } + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + qtrace!("Process connection {:?}", self.c); + let out = self.c.process(dgram, now); - fn needs_waking(&self, now: Instant) -> bool { - self.wake_at.map_or(false, |t| t <= now) - } + if *self.c.state() > State::Handshaking { + // Remove any active connection attempt now that this is no longer handshaking. + self.active_attempt.take(); + } - fn woken(&mut self) { - self.wake_at = None; + out } } @@ -167,18 +166,8 @@ pub struct Server { cid_generator: Rc>, /// Connection parameters. conn_params: ConnectionParameters, - /// Active connection attempts, keyed by `AttemptKey`. Initial packets with - /// the same key are routed to the connection that was first accepted. - /// This is cleared out when the connection is closed or established. - active_attempts: HashMap, /// All connections, keyed by `ConnectionId`. connections: ConnectionTableRef, - /// The connections that have new events. - active: HashSet, - /// The set of connections that need immediate processing. - waiting: VecDeque, - /// The latest [`Output::Callback`] returned from [`Server::process`]. - wake_at: Option, /// Address validation logic, which determines whether we send a Retry. address_validation: Rc>, /// Directory to create qlog traces in @@ -218,14 +207,10 @@ impl Server { zero_rtt_checker: ServerZeroRttChecker::new(zero_rtt_checker), cid_generator, conn_params, - active_attempts: HashMap::default(), connections: Rc::default(), - active: HashSet::default(), - waiting: VecDeque::default(), address_validation: Rc::new(RefCell::new(validation)), qlog_dir: None, ech_config: None, - wake_at: None, }) } @@ -263,49 +248,6 @@ impl Server { self.ech_config.as_ref().map_or(&[], |cfg| &cfg.encoded) } - fn process_connection( - &mut self, - c: &StateRef, - dgram: Option<&Datagram>, - now: Instant, - ) -> Option { - qtrace!([self], "Process connection {:?}", c); - let out = c.borrow_mut().process(dgram, now); - match out { - Output::Datagram(_) => { - qtrace!([self], "Sending packet, added to waiting connections"); - self.waiting.push_back(Rc::clone(c)); - } - Output::Callback(delay) => { - let next = now + delay; - c.borrow_mut().set_wake_at(next); - if self.wake_at.map_or(true, |c| c > next) { - self.wake_at = Some(next); - } - } - Output::None => {} - } - if c.borrow().has_events() { - qtrace!([self], "Connection active: {:?}", c); - self.active.insert(ActiveConnectionRef { c: Rc::clone(c) }); - } - - if *c.borrow().state() > State::Handshaking { - // Remove any active connection attempt now that this is no longer handshaking. - if let Some(k) = c.borrow_mut().active_attempt.take() { - self.active_attempts.remove(&k); - } - } - - if matches!(c.borrow().state(), State::Closed(_)) { - c.borrow_mut().set_qlog(NeqoQlog::disabled()); - self.connections - .borrow_mut() - .retain(|_, v| !Rc::ptr_eq(v, c)); - } - out.dgram() - } - fn connection(&self, cid: ConnectionIdRef) -> Option { self.connections.borrow().get(&cid[..]).cloned() } @@ -315,14 +257,14 @@ impl Server { initial: InitialDetails, dgram: &Datagram, now: Instant, - ) -> Option { + ) -> Output { qdebug!([self], "Handle initial"); let res = self .address_validation .borrow() .validate(&initial.token, dgram.source(), now); match res { - AddressValidationResult::Invalid => None, + AddressValidationResult::Invalid => Output::None, AddressValidationResult::Pass => self.connection_attempt(initial, dgram, None, now), AddressValidationResult::ValidRetry(orig_dcid) => { self.connection_attempt(initial, dgram, Some(orig_dcid), now) @@ -337,7 +279,7 @@ impl Server { ); let Ok(token) = res else { qerror!([self], "unable to generate token, dropping packet"); - return None; + return Output::None; }; if let Some(new_dcid) = self.cid_generator.borrow_mut().generate_cid() { let packet = PacketBuilder::retry( @@ -348,16 +290,19 @@ impl Server { &initial.dst_cid, ); if let Ok(p) = packet { - let retry = - Datagram::new(dgram.destination(), dgram.source(), dgram.tos(), p); - Some(retry) + Output::Datagram(Datagram::new( + dgram.destination(), + dgram.source(), + dgram.tos(), + p, + )) } else { qerror!([self], "unable to encode retry, dropping packet"); - None + Output::None } } else { qerror!([self], "no connection ID for retry, dropping packet"); - None + Output::None } } } @@ -369,21 +314,23 @@ impl Server { dgram: &Datagram, orig_dcid: Option, now: Instant, - ) -> Option { + ) -> Output { let attempt_key = AttemptKey { remote_address: dgram.source(), odcid: orig_dcid.as_ref().unwrap_or(&initial.dst_cid).clone(), }; - if let Some(c) = self.active_attempts.get(&attempt_key) { + let connection = self.connections.borrow().values().find_map(|c| { + (c.borrow().active_attempt.as_ref() == Some(&attempt_key)).then(|| Rc::clone(c)) + }); + if let Some(c) = connection { qdebug!( [self], "Handle Initial for existing connection attempt {:?}", attempt_key ); - let c = Rc::clone(c); - self.process_connection(&c, Some(dgram), now) + c.borrow_mut().process(Some(dgram), now) } else { - self.accept_connection(attempt_key, initial, dgram, orig_dcid, now) + self.accept_connection(&attempt_key, initial, dgram, orig_dcid, now) } } @@ -465,12 +412,12 @@ impl Server { fn accept_connection( &mut self, - attempt_key: AttemptKey, + attempt_key: &AttemptKey, initial: InitialDetails, dgram: &Datagram, orig_dcid: Option, now: Instant, - ) -> Option { + ) -> Output { qinfo!([self], "Accept connection {:?}", attempt_key); // The internal connection ID manager that we use is not used directly. // Instead, wrap it so that we can save connection IDs. @@ -493,16 +440,13 @@ impl Server { match sconn { Ok(mut c) => { - self.setup_connection(&mut c, &attempt_key, initial, orig_dcid); + self.setup_connection(&mut c, attempt_key, initial, orig_dcid); let c = Rc::new(RefCell::new(ServerConnectionState { c, - wake_at: None, active_attempt: Some(attempt_key.clone()), })); cid_mgr.borrow_mut().set_connection(&c); - let previous_attempt = self.active_attempts.insert(attempt_key, Rc::clone(&c)); - debug_assert!(previous_attempt.is_none()); - self.process_connection(&c, Some(dgram), now) + return c.borrow_mut().process(Some(dgram), now); } Err(e) => { qwarn!([self], "Unable to create connection"); @@ -513,7 +457,7 @@ impl Server { initial.version.wire_version(), ); } - None + Output::None } } } @@ -521,31 +465,28 @@ impl Server { /// Handle 0-RTT packets that were sent with the client's choice of connection ID. /// Most 0-RTT will arrive this way. A client can usually send 1-RTT after it /// receives a connection ID from the server. - fn handle_0rtt( - &mut self, - dgram: &Datagram, - dcid: ConnectionId, - now: Instant, - ) -> Option { + fn handle_0rtt(&mut self, dgram: &Datagram, dcid: ConnectionId, now: Instant) -> Output { let attempt_key = AttemptKey { remote_address: dgram.source(), odcid: dcid, }; - if let Some(c) = self.active_attempts.get(&attempt_key) { + let connection = self.connections.borrow().values().find_map(|c| { + (c.borrow().active_attempt.as_ref() == Some(&attempt_key)).then(|| Rc::clone(c)) + }); + if let Some(c) = connection { qdebug!( [self], "Handle 0-RTT for existing connection attempt {:?}", attempt_key ); - let c = Rc::clone(c); - self.process_connection(&c, Some(dgram), now) + c.borrow_mut().process(Some(dgram), now) } else { qdebug!([self], "Dropping 0-RTT for unknown connection"); - None + Output::None } } - fn process_input(&mut self, dgram: &Datagram, now: Instant) -> Option { + fn process_input(&mut self, dgram: &Datagram, now: Instant) -> Output { qtrace!("Process datagram: {}", hex(&dgram[..])); // This is only looking at the first packet header in the datagram. @@ -553,18 +494,18 @@ impl Server { let res = PublicPacket::decode(&dgram[..], self.cid_generator.borrow().as_decoder()); let Ok((packet, _remainder)) = res else { qtrace!([self], "Discarding {:?}", dgram); - return None; + return Output::None; }; // Finding an existing connection. Should be the most common case. if let Some(c) = self.connection(packet.dcid()) { - return self.process_connection(&c, Some(dgram), now); + return c.borrow_mut().process(Some(dgram), now); } if packet.packet_type() == PacketType::Short { // TODO send a stateless reset here. qtrace!([self], "Short header packet for an unknown connection"); - return None; + return Output::None; } if packet.packet_type() == PacketType::OtherVersion @@ -577,7 +518,7 @@ impl Server { { if dgram.len() < MIN_INITIAL_PACKET_SIZE { qdebug!([self], "Unsupported version: too short"); - return None; + return Output::None; } qdebug!([self], "Unsupported version: {:x}", packet.wire_version()); @@ -594,7 +535,7 @@ impl Server { packet.wire_version(), ); - return Some(Datagram::new( + return Output::Datagram(Datagram::new( dgram.destination(), dgram.source(), dgram.tos(), @@ -606,7 +547,7 @@ impl Server { PacketType::Initial => { if dgram.len() < MIN_INITIAL_PACKET_SIZE { qdebug!([self], "Drop initial: too short"); - return None; + return Output::None; } // Copy values from `packet` because they are currently still borrowing from // `dgram`. @@ -620,71 +561,62 @@ impl Server { PacketType::OtherVersion => unreachable!(), _ => { qtrace!([self], "Not an initial packet"); - None + Output::None } } } /// Iterate through the pending connections looking for any that might want /// to send a datagram. Stop at the first one that does. - fn process_next_output(&mut self, now: Instant) -> Option { - qtrace!([self], "No packet to send, look at waiting connections"); - while let Some(c) = self.waiting.pop_front() { - if let Some(d) = self.process_connection(&c, None, now) { - return Some(d); + fn process_next_output(&mut self, now: Instant) -> Output { + let mut callback = None; + + for connection in self.connections.borrow().values() { + match connection.borrow_mut().process(None, now) { + Output::None => {} + d @ Output::Datagram(_) => return d, + Output::Callback(next) => match callback { + Some(previous) => callback = Some(min(previous, next)), + None => callback = Some(next), + }, } } - qtrace!([self], "No packet to send still, check wake up times"); - loop { - let connection = self - .connections - .borrow() - .values() - .find(|c| c.borrow().needs_waking(now)) - .cloned()?; - let datagram = self.process_connection(&connection, None, now); - connection.borrow_mut().woken(); - if datagram.is_some() { - return datagram; - } - } + callback.map_or(Output::None, Output::Callback) } pub fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - if self.wake_at.map_or(false, |c| c <= now) { - self.wake_at = None; - } + let out = dgram + .map_or(Output::None, |d| self.process_input(d, now)) + .or_else(|| self.process_next_output(now)); - dgram - .and_then(|d| self.process_input(d, now)) - .or_else(|| self.process_next_output(now)) - .map(|d| { - qtrace!([self], "Send packet: {:?}", d); - Output::Datagram(d) - }) - .or_else(|| self.wake_at.take().map(|c| Output::Callback(c - now))) - .unwrap_or_else(|| { - qtrace!([self], "Go dormant"); - Output::None - }) + // Clean-up closed connections. + self.connections + .borrow_mut() + .retain(|_, c| !matches!(c.borrow().state(), State::Closed(_))); + + out } /// This lists the connections that have received new events /// as a result of calling `process()`. - pub fn active_connections(&mut self) -> Vec { - mem::take(&mut self.active).into_iter().collect() + pub fn active_connections(&mut self) -> HashSet { + self.connections + .borrow() + .values() + .filter(|c| c.borrow().has_events()) + .map(|c| ActiveConnectionRef { c: Rc::clone(c) }) + .collect() } /// Whether any connections have received new events as a result of calling /// `process()`. #[must_use] pub fn has_active_connections(&self) -> bool { - !self.active.is_empty() - } - - pub fn add_to_waiting(&mut self, c: &ActiveConnectionRef) { - self.waiting.push_back(c.connection()); + self.connections + .borrow() + .values() + .any(|c| c.borrow().has_events()) } } diff --git a/neqo-transport/tests/common/mod.rs b/neqo-transport/tests/common/mod.rs index ecbbe1c3ce..2e28fc3070 100644 --- a/neqo-transport/tests/common/mod.rs +++ b/neqo-transport/tests/common/mod.rs @@ -48,7 +48,6 @@ pub fn connected_server(server: &mut Server) -> ActiveConnectionRef { .iter() .filter(|c: &&ActiveConnectionRef| *c.borrow().state() == State::Confirmed); let c = confirmed.next().expect("one confirmed"); - assert!(confirmed.next().is_none(), "only one confirmed"); c.clone() } diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index 39d04fb681..7b4ceccf6c 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -210,15 +210,14 @@ fn same_initial_after_connected() { let server_initial = server.process(client_initial.as_dgram_ref(), now()).dgram(); assert!(server_initial.is_some()); complete_connection(&mut client, &mut server, server_initial); - // This removes the connection from the active set until something happens to it. - assert_eq!(server.active_connections().len(), 0); + assert_eq!(server.active_connections().len(), 1); // Now make a new connection using the exact same initial as before. // The server should respond to an attempt to connect with the same Initial. let dgram = server.process(client_initial.as_dgram_ref(), now()).dgram(); assert!(dgram.is_some()); // The server should make a new connection object. - assert_eq!(server.active_connections().len(), 1); + assert_eq!(server.active_connections().len(), 2); } #[test] @@ -260,7 +259,7 @@ fn drop_short_initial() { } /// Verify that the server can read 0-RTT properly. A more robust server would buffer -/// 0-RTT before the handshake begins and let 0-RTT arrive for a short periiod after +/// 0-RTT before the handshake begins and let 0-RTT arrive for a short period after /// the handshake completes, but ours is for testing so it only allows 0-RTT while /// the handshake is running. #[test] @@ -273,7 +272,7 @@ fn zero_rtt() { let t = server.process(None, now).callback(); now += t; assert_eq!(server.process(None, now), Output::None); - assert_eq!(server.active_connections().len(), 1); + assert_eq!(server.active_connections().len(), 0); let start_time = now; let mut client = default_client(); @@ -310,7 +309,17 @@ fn zero_rtt() { // The server will have received two STREAM frames now if it processed both packets. let active = server.active_connections(); assert_eq!(active.len(), 1); - assert_eq!(active[0].borrow().stats().frame_rx.stream, 2); + assert_eq!( + active + .iter() + .next() + .unwrap() + .borrow() + .stats() + .frame_rx + .stream, + 2 + ); // Complete the handshake. As the client was pacing 0-RTT packets, extend the time // a little so that the pacer doesn't prevent the Finished from being sent. @@ -322,7 +331,17 @@ fn zero_rtt() { mem::drop(server.process(Some(&c4), now)); let active = server.active_connections(); assert_eq!(active.len(), 1); - assert_eq!(active[0].borrow().stats().frame_rx.stream, 2); + assert_eq!( + active + .iter() + .next() + .unwrap() + .borrow() + .stats() + .frame_rx + .stream, + 2 + ); } #[test]