diff --git a/quinn-udp/Cargo.toml b/quinn-udp/Cargo.toml index f98c8ec73e..d537a9f90e 100644 --- a/quinn-udp/Cargo.toml +++ b/quinn-udp/Cargo.toml @@ -32,7 +32,9 @@ once_cell = { workspace = true } windows-sys = { workspace = true } [dev-dependencies] -criterion = "0.5" +criterion = { version = "0.5", default-features = false, features = ["async_tokio"] } +# TODO: rework features +tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros", "signal", "net"] } [lib] # See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options diff --git a/quinn-udp/benches/throughput.rs b/quinn-udp/benches/throughput.rs index 7231208ee1..e4c825c254 100644 --- a/quinn-udp/benches/throughput.rs +++ b/quinn-udp/benches/throughput.rs @@ -1,45 +1,41 @@ use criterion::{criterion_group, criterion_main, Criterion}; use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE}; -use std::cmp::min; -use std::{io::IoSliceMut, net::UdpSocket}; +use tokio::io::Interest; +use std::{cmp::min, io::{ErrorKind, IoSliceMut}, net::UdpSocket}; +use tokio::runtime::Runtime; pub fn criterion_benchmark(c: &mut Criterion) { const TOTAL_BYTES: usize = 10 * 1024 * 1024; - // Maximum GSO buffer size is 64k. - const MAX_BUFFER_SIZE: usize = u16::MAX as usize; const SEGMENT_SIZE: usize = 1280; - let send = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) - .unwrap(); - let recv = UdpSocket::bind("[::1]:0") - .or_else(|_| UdpSocket::bind("127.0.0.1:0")) - .unwrap(); - let max_segments = min( - UdpSocketState::new((&send).into()) - .unwrap() - .max_gso_segments(), - MAX_BUFFER_SIZE / SEGMENT_SIZE, - ); - let dst_addr = recv.local_addr().unwrap(); - let send_state = UdpSocketState::new((&send).into()).unwrap(); - let recv_state = UdpSocketState::new((&recv).into()).unwrap(); - // Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy - recv.set_nonblocking(false).unwrap(); - - let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * recv_state.gro_segments()]; BATCH_SIZE]; - let mut receive_slices = receive_buffers - .iter_mut() - .map(|buf| IoSliceMut::new(buf)) - .collect::>(); - let mut meta = vec![RecvMeta::default(); BATCH_SIZE]; - + let mut permutations = vec![]; for gso_enabled in [false, true] { - let mut group = c.benchmark_group(format!("gso_{}", gso_enabled)); + for gro_enabled in [false, true] { + for recvmmsg_enabled in [false, true] { + permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled)) + } + } + } + + for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations { + let mut group = c.benchmark_group(format!("gso_{}_gro_{}_recvmmsg_{}", gso_enabled, gro_enabled, recvmmsg_enabled)); group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); - let segments = if gso_enabled { max_segments } else { 1 }; - let msg = vec![0xAB; SEGMENT_SIZE * segments]; + + let send = UdpSocket::bind("[::1]:0") + .or_else(|_| UdpSocket::bind("127.0.0.1:0")) + .unwrap(); + let recv = UdpSocket::bind("[::1]:0") + .or_else(|_| UdpSocket::bind("127.0.0.1:0")) + .unwrap(); + let dst_addr = recv.local_addr().unwrap(); + let send_state = UdpSocketState::new((&send).into()).unwrap(); + let recv_state = UdpSocketState::new((&recv).into()).unwrap(); + + // TODO: Min needed? + let gso_segments = min(32, if gso_enabled { send_state.max_gso_segments() } else { 1 }); + // TODO: Min needed? + let msg = vec![0xAB; min(u16::MAX as usize, SEGMENT_SIZE * dbg!(gso_segments))]; let transmit = Transmit { destination: dst_addr, @@ -50,24 +46,48 @@ pub fn criterion_benchmark(c: &mut Criterion) { }; group.bench_function("throughput", |b| { - b.iter(|| { - let mut sent: usize = 0; - while sent < TOTAL_BYTES { - send_state.send((&send).into(), &transmit).unwrap(); - sent += transmit.contents.len(); + b.to_async(Runtime::new().unwrap()).iter( + || async { + let send = tokio::net::UdpSocket::from_std(send.try_clone().unwrap()).unwrap(); + let recv = tokio::net::UdpSocket::from_std(recv.try_clone().unwrap()).unwrap(); + + let gro_segments = min(32, if gro_enabled { recv_state.gro_segments() } else { 1 }); + let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 }; + let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * dbg!(gro_segments) ]; dbg!(batch_size)]; + println!("{}", receive_buffers[0].len()); + let mut receive_slices = receive_buffers + .iter_mut() + .map(|buf| IoSliceMut::new(buf)) + .collect::>(); + let mut meta = vec![RecvMeta::default(); batch_size]; + + let mut sent: usize = 0; + let mut received: usize = 0; + while dbg!(sent) < TOTAL_BYTES { + send.writable().await.unwrap(); + send.try_io(Interest::WRITABLE, || { + send_state.send((&send).into(), &transmit) + }).unwrap(); + sent += transmit.contents.len(); - let mut received_segments = 0; - while received_segments < segments { - let n = recv_state - .recv((&recv).into(), &mut receive_slices, &mut meta) - .unwrap(); - for i in meta.iter().take(n) { - received_segments += i.len / i.stride; + while dbg!(received) < dbg!(sent){ + recv.readable().await.unwrap(); + let n = match recv.try_io(Interest::READABLE, || { + recv_state + .recv((&recv).into(), &mut receive_slices, &mut meta) + }) { + Ok(n) => n, + // false positive. + Err(e) if e.kind() == ErrorKind::WouldBlock => {println!("continue"); continue}, + e => e.unwrap(), + }; + for i in meta.iter().take(dbg!(n)) { + received += i.len; + } } } - assert_eq!(received_segments, segments); - } - }) + }, + ) }); } }