diff --git a/neqo-client/src/main.rs b/neqo-client/src/main.rs index 868a86ddd7..91593822cf 100644 --- a/neqo-client/src/main.rs +++ b/neqo-client/src/main.rs @@ -826,11 +826,13 @@ impl<'a> ClientRunner<'a> { match ready(self.socket, self.timeout.as_mut()).await? { Ready::Socket => loop { - let dgram = self.socket.recv(&self.local_addr)?; - if dgram.is_none() { + let dgrams = self.socket.recv(&self.local_addr)?; + if dgrams.is_empty() { break; } - self.process(dgram.as_ref()).await?; + for dgram in &dgrams { + self.process(Some(dgram)).await?; + } self.handler.maybe_key_update(&mut self.client)?; }, Ready::Timeout => { @@ -1339,11 +1341,13 @@ mod old { match ready(self.socket, self.timeout.as_mut()).await? { Ready::Socket => loop { - let dgram = self.socket.recv(&self.local_addr)?; - if dgram.is_none() { + let dgrams = self.socket.recv(&self.local_addr)?; + if dgrams.is_empty() { break; } - self.process(dgram.as_ref()).await?; + for dgram in &dgrams { + self.process(Some(dgram)).await?; + } self.handler.maybe_key_update(&mut self.client)?; }, Ready::Timeout => { diff --git a/neqo-common/src/udp.rs b/neqo-common/src/udp.rs index 64fc356760..cee9f7f08d 100644 --- a/neqo-common/src/udp.rs +++ b/neqo-common/src/udp.rs @@ -72,7 +72,7 @@ impl Socket { } /// Receive a UDP datagram on the specified socket. - pub fn recv(&mut self, local_address: &SocketAddr) -> Result, io::Error> { + pub fn recv(&mut self, local_address: &SocketAddr) -> Result, io::Error> { let mut meta = RecvMeta::default(); match self.socket.try_io(Interest::READABLE, || { @@ -89,7 +89,7 @@ impl Socket { if err.kind() == io::ErrorKind::WouldBlock || err.kind() == io::ErrorKind::Interrupted => { - return Ok(None) + return Ok(vec![]) } Err(err) => { return Err(err); @@ -98,9 +98,8 @@ impl Socket { if meta.len == 0 { eprintln!("zero length datagram received?"); - return Ok(None); + return Ok(vec![]); } - if meta.len == self.recv_buf.len() { eprintln!( "Might have received more than {} bytes", @@ -108,13 +107,18 @@ impl Socket { ); } - Ok(Some(Datagram::new( - meta.addr, - *local_address, - meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), - None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749 - &self.recv_buf[..meta.len], - ))) + Ok(self.recv_buf[0..meta.len] + .chunks(meta.stride.min(self.recv_buf.len())) + .map(|d| { + Datagram::new( + meta.addr, + *local_address, + meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(), + None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749 + d, + ) + }) + .collect()) } } @@ -144,6 +148,8 @@ mod tests { let received_datagram = receiver .recv(&receiver_addr) .expect("receive to succeed") + .into_iter() + .next() .expect("receive to yield datagram"); // Assert that the ECN is correct. @@ -154,4 +160,58 @@ mod tests { Ok(()) } + + /// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read. + #[tokio::test] + #[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)] + async fn many_datagrams_through_gro() -> Result<(), io::Error> { + const SEGMENT_SIZE: usize = 128; + + let sender = Socket::bind("127.0.0.1:0")?; + let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); + let mut receiver = Socket::bind(receiver_addr)?; + + // `neqo_common::udp::Socket::send` does not yet + // (https://github.com/mozilla/neqo/issues/1693) support GSO. Use + // `quinn_udp` directly. + let max_gso_segments = sender.state.max_gso_segments(); + let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments]; + let transmit = Transmit { + destination: receiver.local_addr()?, + ecn: EcnCodepoint::from_bits(Into::::into(IpTos::from(( + IpTosDscp::Le, + IpTosEcn::Ect1, + )))), + contents: msg.clone().into(), + segment_size: Some(SEGMENT_SIZE), + src_ip: None, + }; + sender.writable().await?; + let n = sender.socket.try_io(Interest::WRITABLE, || { + sender + .state + .send((&sender.socket).into(), slice::from_ref(&transmit)) + })?; + assert_eq!(n, 1, "only passed one slice"); + + // Allow for one GSO sendmmsg to result in multiple GRO recvmmsg. + let mut num_received = 0; + while num_received < max_gso_segments { + receiver.readable().await?; + receiver + .recv(&receiver_addr) + .expect("receive to succeed") + .into_iter() + .for_each(|d| { + assert_eq!( + SEGMENT_SIZE, + d.len(), + "Expect received datagrams to have same length as sent datagrams." + ); + num_received += 1; + }); + } + + Ok(()) + } } diff --git a/neqo-server/src/main.rs b/neqo-server/src/main.rs index 691b367b73..fdd771fdb3 100644 --- a/neqo-server/src/main.rs +++ b/neqo-server/src/main.rs @@ -688,11 +688,13 @@ impl ServersRunner { match self.ready().await? { Ready::Socket(inx) => loop { let (host, socket) = self.sockets.get_mut(inx).unwrap(); - let dgram = socket.recv(host)?; - if dgram.is_none() { + let dgrams = socket.recv(host)?; + if dgrams.is_empty() { break; } - self.process(dgram.as_ref()).await?; + for dgram in dgrams { + self.process(Some(&dgram)).await?; + } }, Ready::Timeout => { self.timeout = None;