Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(udp): handle multiple datagrams on GRO #1708

Merged
merged 4 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions neqo-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,11 +826,13 @@ impl<'a> ClientRunner<'a> {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
for dgram in &dgrams {
self.process(Some(dgram)).await?;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Timeout => {
Expand Down Expand Up @@ -1339,11 +1341,13 @@ mod old {

match ready(self.socket, self.timeout.as_mut()).await? {
Ready::Socket => loop {
let dgram = self.socket.recv(&self.local_addr)?;
if dgram.is_none() {
let dgrams = self.socket.recv(&self.local_addr)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
for dgram in &dgrams {
self.process(Some(dgram)).await?;
}
self.handler.maybe_key_update(&mut self.client)?;
},
Ready::Timeout => {
Expand Down
82 changes: 71 additions & 11 deletions neqo-common/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Socket {
}

/// Receive a UDP datagram on the specified socket.
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Option<Datagram>, io::Error> {
pub fn recv(&mut self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
let mut meta = RecvMeta::default();

match self.socket.try_io(Interest::READABLE, || {
Expand All @@ -89,7 +89,7 @@ impl Socket {
if err.kind() == io::ErrorKind::WouldBlock
|| err.kind() == io::ErrorKind::Interrupted =>
{
return Ok(None)
return Ok(vec![])
}
Err(err) => {
return Err(err);
Expand All @@ -98,23 +98,27 @@ impl Socket {

if meta.len == 0 {
eprintln!("zero length datagram received?");
return Ok(None);
return Ok(vec![]);
}

if meta.len == self.recv_buf.len() {
eprintln!(
"Might have received more than {} bytes",
self.recv_buf.len()
);
}

Ok(Some(Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
&self.recv_buf[..meta.len],
)))
Ok(self.recv_buf[0..meta.len]
.chunks(meta.stride.min(self.recv_buf.len()))
.map(|d| {
Datagram::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
None, // TODO: get the real TTL https://github.com/quinn-rs/quinn/issues/1749
d,
)
})
.collect())
}
}

Expand Down Expand Up @@ -144,6 +148,8 @@ mod tests {
let received_datagram = receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.next()
.expect("receive to yield datagram");

// Assert that the ECN is correct.
Expand All @@ -154,4 +160,58 @@ mod tests {

Ok(())
}

/// Expect [`Socket::recv`] to handle multiple [`Datagram`]s on GRO read.
#[tokio::test]
#[cfg_attr(not(any(target_os = "linux", target_os = "windows")), ignore)]
async fn many_datagrams_through_gro() -> Result<(), io::Error> {
const SEGMENT_SIZE: usize = 128;

let sender = Socket::bind("127.0.0.1:0")?;
let receiver_addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let mut receiver = Socket::bind(receiver_addr)?;

// `neqo_common::udp::Socket::send` does not yet
// (https://github.com/mozilla/neqo/issues/1693) support GSO. Use
// `quinn_udp` directly.
let max_gso_segments = sender.state.max_gso_segments();
let msg = vec![0xAB; SEGMENT_SIZE * max_gso_segments];
let transmit = Transmit {
destination: receiver.local_addr()?,
ecn: EcnCodepoint::from_bits(Into::<u8>::into(IpTos::from((
IpTosDscp::Le,
IpTosEcn::Ect1,
)))),
contents: msg.clone().into(),
segment_size: Some(SEGMENT_SIZE),
src_ip: None,
};
sender.writable().await?;
let n = sender.socket.try_io(Interest::WRITABLE, || {
sender
.state
.send((&sender.socket).into(), slice::from_ref(&transmit))
})?;
assert_eq!(n, 1, "only passed one slice");

// Allow for one GSO sendmmsg to result in multiple GRO recvmmsg.
let mut num_received = 0;
while num_received < max_gso_segments {
receiver.readable().await?;
receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.for_each(|d| {
assert_eq!(
SEGMENT_SIZE,
d.len(),
"Expect received datagrams to have same length as sent datagrams."
);
num_received += 1;
});
}

Ok(())
}
}
8 changes: 5 additions & 3 deletions neqo-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,13 @@ impl ServersRunner {
match self.ready().await? {
Ready::Socket(inx) => loop {
let (host, socket) = self.sockets.get_mut(inx).unwrap();
let dgram = socket.recv(host)?;
if dgram.is_none() {
let dgrams = socket.recv(host)?;
if dgrams.is_empty() {
break;
}
self.process(dgram.as_ref()).await?;
for dgram in dgrams {
self.process(Some(&dgram)).await?;
}
},
Ready::Timeout => {
self.timeout = None;
Expand Down
Loading