Skip to content

Commit

Permalink
Cleanup UdpSocket::recv_inner
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Sep 15, 2024
1 parent 3df6660 commit 52dfa91
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 87 deletions.
2 changes: 1 addition & 1 deletion neqo-bin/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Socket {
) -> Result<Option<Datagram<&'a [u8]>>, io::Error> {
self.inner
.try_io(tokio::io::Interest::READABLE, || {
neqo_udp::recv_inner_2(local_address, &self.state, &self.inner, recv_buf)
neqo_udp::recv_inner(local_address, &self.state, &self.inner, recv_buf)
})
.map(Some)
.or_else(|e| {
Expand Down
1 change: 1 addition & 0 deletions neqo-common/src/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub struct Datagram<D = Vec<u8>> {
dst: SocketAddr,
/// The segment size if this transmission contains multiple datagrams.
/// This is `None` if the [`Datagram`] only contains a single datagram
// TODO: Need to be an option?
segment_size: Option<usize>,
tos: IpTos,
d: D,
Expand Down
109 changes: 23 additions & 86 deletions neqo-udp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
#![allow(clippy::missing_errors_doc)] // Functions simply delegate to tokio and quinn-udp.

use std::{
cell::RefCell,
io::{self, IoSliceMut},
net::SocketAddr,
slice,
Expand All @@ -23,10 +22,6 @@ use quinn_udp::{EcnCodepoint, RecvMeta, Transmit, UdpSocketState};
// TODO: Experiment with different values across platforms.
pub const RECV_BUF_SIZE: usize = u16::MAX as usize;

std::thread_local! {
static RECV_BUF: RefCell<Vec<u8>> = RefCell::new(vec![0; RECV_BUF_SIZE]);
}

pub fn send_inner(
state: &UdpSocketState,
socket: quinn_udp::UdpSockRef<'_>,
Expand Down Expand Up @@ -57,70 +52,10 @@ use std::os::fd::AsFd as SocketRef;
#[cfg(windows)]
use std::os::windows::io::AsSocket as SocketRef;

pub fn recv_inner(
local_address: &SocketAddr,
state: &UdpSocketState,
socket: impl SocketRef,
) -> Result<Vec<Datagram>, io::Error> {
let dgrams = RECV_BUF.with_borrow_mut(|recv_buf| -> Result<Vec<Datagram>, io::Error> {
let mut meta;

loop {
meta = RecvMeta::default();

state.recv(
(&socket).into(),
&mut [IoSliceMut::new(recv_buf)],
slice::from_mut(&mut meta),
)?;

if meta.len == 0 || meta.stride == 0 {
qdebug!(
"ignoring datagram from {} to {} len {} stride {}",
meta.addr,
local_address,
meta.len,
meta.stride
);
continue;
}

break;
}

Ok(recv_buf[0..meta.len]
.chunks(meta.stride)
.map(|d| {
qtrace!(
"received {} bytes from {} to {}",
d.len(),
meta.addr,
local_address,
);
Datagram::<Vec<u8>>::new(
meta.addr,
*local_address,
meta.ecn.map(|n| IpTos::from(n as u8)).unwrap_or_default(),
d,
)
})
.collect())
})?;

qtrace!(
"received {} datagrams ({:?})",
dgrams.len(),
dgrams.iter().map(|d| d.len()).collect::<Vec<_>>(),
);

Ok(dgrams)
}

// TODO: replace recv_inner in favor of this one.
/// # Panics
///
/// TODO
pub fn recv_inner_2<'a>(
pub fn recv_inner<'a>(
local_address: &SocketAddr,
state: &UdpSocketState,
socket: impl SocketRef,
Expand Down Expand Up @@ -199,8 +134,12 @@ impl<S: SocketRef> Socket<S> {

/// Receive a batch of [`Datagram`]s on the given [`Socket`], each
/// set with the provided local address.
pub fn recv(&self, local_address: &SocketAddr) -> Result<Vec<Datagram>, io::Error> {
recv_inner(local_address, &self.state, &self.inner)
pub fn recv<'a>(
&self,
local_address: &SocketAddr,
recv_buf: &'a mut Vec<u8>,
) -> Result<Datagram<&'a [u8]>, io::Error> {
recv_inner(local_address, &self.state, &self.inner, recv_buf)
}
}

Expand Down Expand Up @@ -232,7 +171,8 @@ mod tests {
);

sender.send(datagram)?;
let res = receiver.recv(&receiver_addr);
let mut recv_buf = Vec::with_capacity(RECV_BUF_SIZE);
let res = receiver.recv(&receiver_addr, &mut recv_buf);
assert_eq!(res.unwrap_err().kind(), std::io::ErrorKind::WouldBlock);

Ok(())
Expand All @@ -254,12 +194,10 @@ mod tests {

sender.send(datagram)?;

let mut recv_buf = Vec::with_capacity(RECV_BUF_SIZE);
let received_datagram = receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.next()
.expect("receive to yield datagram");
.recv(&receiver_addr, &mut recv_buf)
.expect("receive to succeed");

// Assert that the ECN is correct.
assert_eq!(
Expand Down Expand Up @@ -299,19 +237,18 @@ mod tests {

// Allow for one GSO sendmmsg to result in multiple GRO recvmmsg.
let mut num_received = 0;
let mut recv_buf = Vec::with_capacity(RECV_BUF_SIZE);
while num_received < max_gso_segments {
receiver
.recv(&receiver_addr)
.expect("receive to succeed")
.into_iter()
.for_each(|d| {
assert_eq!(
SEGMENT_SIZE,
d.len(),
"Expect received datagrams to have same length as sent datagrams."
);
num_received += 1;
});
recv_buf.clear();
let dgram = receiver
.recv(&receiver_addr, &mut recv_buf)
.expect("receive to succeed");
assert_eq!(
Some(SEGMENT_SIZE),
dgram.segment_size(),
"Expect received datagrams to have same length as sent datagrams."

Check warning on line 249 in neqo-udp/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

neqo-udp/src/lib.rs#L249

Added line #L249 was not covered by tests
);
num_received += dgram.len() / dgram.segment_size().unwrap();
}

Ok(())
Expand Down

0 comments on commit 52dfa91

Please sign in to comment.