diff --git a/quinn-udp/Cargo.toml b/quinn-udp/Cargo.toml index 257ee0015..b7790001a 100644 --- a/quinn-udp/Cargo.toml +++ b/quinn-udp/Cargo.toml @@ -30,8 +30,13 @@ once_cell = { workspace = true } windows-sys = { workspace = true } [dev-dependencies] -criterion = "0.5" +criterion = { version = "0.5", default-features = false, features = ["async_tokio"] } +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net"] } -[target.'cfg(any(target_os = "linux", target_os = "windows"))'.bench] +[lib] +# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options +bench = false + +[[bench]] name = "throughput" harness = false diff --git a/quinn-udp/benches/throughput.rs b/quinn-udp/benches/throughput.rs index 4a3a5062b..beef92f05 100644 --- a/quinn-udp/benches/throughput.rs +++ b/quinn-udp/benches/throughput.rs @@ -1,38 +1,56 @@ +use std::{ + cmp::min, + io::{ErrorKind, IoSliceMut}, + net::{Ipv4Addr, Ipv6Addr, UdpSocket}, +}; + use criterion::{criterion_group, criterion_main, Criterion}; +use tokio::{io::Interest, runtime::Runtime}; + use quinn_udp::{RecvMeta, Transmit, UdpSocketState}; -use std::cmp::min; -use std::net::{Ipv4Addr, Ipv6Addr}; -use std::{io::IoSliceMut, net::UdpSocket, slice}; pub fn criterion_benchmark(c: &mut Criterion) { const TOTAL_BYTES: usize = 10 * 1024 * 1024; - // Maximum GSO buffer size is 64k. - const MAX_BUFFER_SIZE: usize = u16::MAX as usize; const SEGMENT_SIZE: usize = 1280; - let (send_socket, send_state) = new_socket(); - let (recv_socket, recv_state) = new_socket(); - // Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy - recv_socket.set_nonblocking(false).unwrap(); - - let max_segments = min( - UdpSocketState::new((&send_socket).into()) - .unwrap() - .max_gso_segments(), - MAX_BUFFER_SIZE / SEGMENT_SIZE, - ); + let mut rt = Runtime::new().unwrap(); + let (send_socket, send_state) = new_socket(&mut rt); + let (recv_socket, recv_state) = new_socket(&mut rt); let dst_addr = recv_socket.local_addr().unwrap(); - let mut receive_buffer = vec![0; MAX_BUFFER_SIZE]; - let mut meta = RecvMeta::default(); + let mut permutations = vec![]; + for gso_enabled in [ + false, + #[cfg(any(target_os = "linux", target_os = "windows"))] + true, + ] { + for gro_enabled in [false, true] { + #[cfg(target_os = "windows")] + if gso_enabled && !gro_enabled { + // Windows requires receive buffer to fit entire datagram on GRO + // enabled socket. + // + // OS error: "A message sent on a datagram socket was larger + // than the internal message buffer or some other network limit, + // or the buffer used to receive a datagram into was smaller + // than the datagram itself." + continue; + } - for gso_enabled in [false, true] { - let mut group = c.benchmark_group(format!("gso_{}", gso_enabled)); - group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); + permutations.push((gso_enabled, gro_enabled)); + } + } - let segments = if gso_enabled { max_segments } else { 1 }; - let msg = vec![0xAB; SEGMENT_SIZE * segments]; + for (gso_enabled, gro_enabled) in permutations { + let mut group = c.benchmark_group(format!("gso_{}_gro_{}", gso_enabled, gro_enabled)); + group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); + let gso_segments = if gso_enabled { + send_state.max_gso_segments() + } else { + 1 + }; + let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)]; let transmit = Transmit { destination: dst_addr, ecn: None, @@ -40,40 +58,63 @@ pub fn criterion_benchmark(c: &mut Criterion) { segment_size: gso_enabled.then_some(SEGMENT_SIZE), src_ip: None, }; + let gro_segments = if gro_enabled { + recv_state.gro_segments() + } else { + 1 + }; + let batch_size = 1; group.bench_function("throughput", |b| { - b.iter(|| { + b.to_async(Runtime::new().unwrap()).iter(|| async { + let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size]; + let mut receive_slices = receive_buffers + .iter_mut() + .map(|buf| IoSliceMut::new(buf)) + .collect::>(); + let mut meta = vec![RecvMeta::default(); batch_size]; + let mut sent: usize = 0; + let mut received: usize = 0; while sent < TOTAL_BYTES { - send_state.send((&send_socket).into(), &transmit).unwrap(); + send_socket.writable().await.unwrap(); + send_socket + .try_io(Interest::WRITABLE, || { + send_state.send((&send_socket).into(), &transmit) + }) + .unwrap(); sent += transmit.contents.len(); - let mut received_segments = 0; - while received_segments < segments { - let n = recv_state - .recv( - (&recv_socket).into(), - &mut [IoSliceMut::new(&mut receive_buffer)], - slice::from_mut(&mut meta), - ) - .unwrap(); - assert_eq!(n, 1); - received_segments += meta.len / meta.stride; + while received < sent { + recv_socket.readable().await.unwrap(); + let n = match recv_socket.try_io(Interest::READABLE, || { + recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta) + }) { + Ok(n) => n, + // recv.readable() can lead to false positives. Try again. + Err(e) if e.kind() == ErrorKind::WouldBlock => continue, + e => e.unwrap(), + }; + received += meta.iter().map(|m| m.len).take(n).sum::(); } - assert_eq!(received_segments, segments); } }) }); } } -fn new_socket() -> (UdpSocket, UdpSocketState) { +fn new_socket(rt: &mut Runtime) -> (tokio::net::UdpSocket, UdpSocketState) { let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) .unwrap(); let state = UdpSocketState::new((&socket).into()).unwrap(); + let socket = rt.block_on(async { tokio::net::UdpSocket::from_std(socket).unwrap() }); (socket, state) } + criterion_group!(benches, criterion_benchmark); criterion_main!(benches); + +const MAX_IP_UDP_HEADER_SIZE: usize = 48; +const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE;