From cf0009842dade14a52416180f94a2dc507cd867a Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 15 Apr 2024 07:38:54 +0200 Subject: [PATCH] fix(bin): don't sleep when events available (#1806) * fix(bin): don't sleep when events available A call to `process_*` can emit events as a side effect, e.g. a `ConnectionEvent::StateChange(State::Connected)`. These are not returned from the function call, but instead internally enqueued to be later on consumed through the various `Provider::next_event` implementations. https://github.com/mozilla/neqo/blob/166b84c5a3307d678f38d9994af9b56b68c6b695/neqo-common/src/event.rs#L9-L15 In the case of `neqo-client` the events are consumed through `self.handler.handle` which internally calls `Provider::next_event`. A client or server should not go to sleep, waiting for either a UDP datagram to arrive or a timeout to fire, when there are events available. This commit ensures `ready().await` is only called when no events are available. * Add test for has_active_connections --- neqo-bin/src/client/http09.rs | 4 ++++ neqo-bin/src/client/http3.rs | 4 ++++ neqo-bin/src/client/mod.rs | 5 +++++ neqo-bin/src/server/mod.rs | 16 +++++++++++++--- neqo-bin/src/server/old_https.rs | 4 ++++ neqo-transport/src/server.rs | 7 +++++++ neqo-transport/tests/server.rs | 13 +++++++++++++ 7 files changed, 50 insertions(+), 3 deletions(-) diff --git a/neqo-bin/src/client/http09.rs b/neqo-bin/src/client/http09.rs index a9ed12b157..4a1be07689 100644 --- a/neqo-bin/src/client/http09.rs +++ b/neqo-bin/src/client/http09.rs @@ -169,6 +169,10 @@ impl super::Client for Connection { fn stats(&self) -> neqo_transport::Stats { self.stats() } + + fn has_events(&self) -> bool { + neqo_common::event::Provider::has_events(self) + } } impl<'b> Handler<'b> { diff --git a/neqo-bin/src/client/http3.rs b/neqo-bin/src/client/http3.rs index b3f577127e..5ba5cc4b20 100644 --- a/neqo-bin/src/client/http3.rs +++ b/neqo-bin/src/client/http3.rs @@ -134,6 +134,10 @@ impl super::Client for Http3Client { fn stats(&self) -> neqo_transport::Stats { self.transport_stats() } + + fn has_events(&self) -> bool { + neqo_common::event::Provider::has_events(self) + } } impl<'a> super::Handler for Handler<'a> { diff --git a/neqo-bin/src/client/mod.rs b/neqo-bin/src/client/mod.rs index 61e43c00d1..3896c7243e 100644 --- a/neqo-bin/src/client/mod.rs +++ b/neqo-bin/src/client/mod.rs @@ -352,6 +352,7 @@ trait Client { fn process_multiple_input<'a, I>(&mut self, dgrams: I, now: Instant) where I: IntoIterator; + fn has_events(&self) -> bool; fn close(&mut self, now: Instant, app_error: AppError, msg: S) where S: AsRef + Display; @@ -405,6 +406,10 @@ impl<'a, H: Handler> Runner<'a, H> { }; } + if self.client.has_events() { + continue; + } + match ready(self.socket, self.timeout.as_mut()).await? { Ready::Socket => self.process_multiple_input().await?, Ready::Timeout => { diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 3490b3e9b3..5010964aeb 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -197,6 +197,7 @@ fn qns_read_response(filename: &str) -> Result, io::Error> { trait HttpServer: Display { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; fn process_events(&mut self, args: &Args, now: Instant); + fn has_events(&self) -> bool; fn set_qlog_dir(&mut self, dir: Option); fn set_ciphers(&mut self, ciphers: &[Cipher]); fn validate_address(&mut self, when: ValidateAddress); @@ -421,6 +422,10 @@ impl HttpServer for SimpleServer { .unwrap(); self.server.ech_config() } + + fn has_events(&self) -> bool { + self.server.has_events() + } } struct ServersRunner { @@ -546,6 +551,14 @@ impl ServersRunner { async fn run(&mut self) -> Res<()> { loop { + self.server.process_events(&self.args, self.args.now()); + + self.process(None).await?; + + if self.server.has_events() { + continue; + } + match self.ready().await? { Ready::Socket(inx) => loop { let (host, socket) = self.sockets.get_mut(inx).unwrap(); @@ -562,9 +575,6 @@ impl ServersRunner { self.process(None).await?; } } - - self.server.process_events(&self.args, self.args.now()); - self.process(None).await?; } } } diff --git a/neqo-bin/src/server/old_https.rs b/neqo-bin/src/server/old_https.rs index 38f3fdc3a7..05520e1d3d 100644 --- a/neqo-bin/src/server/old_https.rs +++ b/neqo-bin/src/server/old_https.rs @@ -252,6 +252,10 @@ impl HttpServer for Http09Server { .expect("enable ECH"); self.server.ech_config() } + + fn has_events(&self) -> bool { + self.server.has_active_connections() + } } impl Display for Http09Server { diff --git a/neqo-transport/src/server.rs b/neqo-transport/src/server.rs index 96a6244ef1..60909d71e1 100644 --- a/neqo-transport/src/server.rs +++ b/neqo-transport/src/server.rs @@ -689,6 +689,13 @@ impl Server { mem::take(&mut self.active).into_iter().collect() } + /// Whether any connections have received new events as a result of calling + /// `process()`. + #[must_use] + pub fn has_active_connections(&self) -> bool { + !self.active.is_empty() + } + pub fn add_to_waiting(&mut self, c: &ActiveConnectionRef) { self.waiting.push_back(c.connection()); } diff --git a/neqo-transport/tests/server.rs b/neqo-transport/tests/server.rs index 7388e0fee7..9a225d42a2 100644 --- a/neqo-transport/tests/server.rs +++ b/neqo-transport/tests/server.rs @@ -774,3 +774,16 @@ fn ech() { .ech_accepted() .unwrap()); } + +#[test] +fn has_active_connections() { + let mut server = default_server(); + let mut client = default_client(); + + assert!(!server.has_active_connections()); + + let initial = client.process(None, now()); + let _ = server.process(initial.as_dgram_ref(), now()).dgram(); + + assert!(server.has_active_connections()); +}