Skip to content

bench(udp): support all GSO and recvmmsg permutations across platforms #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ once_cell = { workspace = true }
windows-sys = { workspace = true }

[dev-dependencies]
criterion = "0.5"
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
# TODO: rework features
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros", "signal", "net"] }

[lib]
# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options
Expand Down
114 changes: 67 additions & 47 deletions quinn-udp/benches/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,41 @@
use criterion::{criterion_group, criterion_main, Criterion};
use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE};
use std::cmp::min;
use std::{io::IoSliceMut, net::UdpSocket};
use tokio::io::Interest;
use std::{cmp::min, io::{ErrorKind, IoSliceMut}, net::UdpSocket};
use tokio::runtime::Runtime;

pub fn criterion_benchmark(c: &mut Criterion) {
const TOTAL_BYTES: usize = 10 * 1024 * 1024;
// Maximum GSO buffer size is 64k.
const MAX_BUFFER_SIZE: usize = u16::MAX as usize;
const SEGMENT_SIZE: usize = 1280;

let send = UdpSocket::bind("[::1]:0")
.or_else(|_| UdpSocket::bind("127.0.0.1:0"))
.unwrap();
let recv = UdpSocket::bind("[::1]:0")
.or_else(|_| UdpSocket::bind("127.0.0.1:0"))
.unwrap();
let max_segments = min(
UdpSocketState::new((&send).into())
.unwrap()
.max_gso_segments(),
MAX_BUFFER_SIZE / SEGMENT_SIZE,
);
let dst_addr = recv.local_addr().unwrap();
let send_state = UdpSocketState::new((&send).into()).unwrap();
let recv_state = UdpSocketState::new((&recv).into()).unwrap();
// Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy
recv.set_nonblocking(false).unwrap();

let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * recv_state.gro_segments()]; BATCH_SIZE];
let mut receive_slices = receive_buffers
.iter_mut()
.map(|buf| IoSliceMut::new(buf))
.collect::<Vec<_>>();
let mut meta = vec![RecvMeta::default(); BATCH_SIZE];

let mut permutations = vec![];
for gso_enabled in [false, true] {
let mut group = c.benchmark_group(format!("gso_{}", gso_enabled));
for gro_enabled in [false, true] {
for recvmmsg_enabled in [false, true] {
permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled))
}
}
}

for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations {
let mut group = c.benchmark_group(format!("gso_{}_gro_{}_recvmmsg_{}", gso_enabled, gro_enabled, recvmmsg_enabled));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));

let segments = if gso_enabled { max_segments } else { 1 };
let msg = vec![0xAB; SEGMENT_SIZE * segments];

let send = UdpSocket::bind("[::1]:0")
.or_else(|_| UdpSocket::bind("127.0.0.1:0"))
.unwrap();
let recv = UdpSocket::bind("[::1]:0")
.or_else(|_| UdpSocket::bind("127.0.0.1:0"))
.unwrap();
let dst_addr = recv.local_addr().unwrap();
let send_state = UdpSocketState::new((&send).into()).unwrap();
let recv_state = UdpSocketState::new((&recv).into()).unwrap();

// TODO: Min needed?
let gso_segments = min(32, if gso_enabled { send_state.max_gso_segments() } else { 1 });
// TODO: Min needed?
let msg = vec![0xAB; min(u16::MAX as usize, SEGMENT_SIZE * dbg!(gso_segments))];

let transmit = Transmit {
destination: dst_addr,
Expand All @@ -50,24 +46,48 @@ pub fn criterion_benchmark(c: &mut Criterion) {
};

group.bench_function("throughput", |b| {
b.iter(|| {
let mut sent: usize = 0;
while sent < TOTAL_BYTES {
send_state.send((&send).into(), &transmit).unwrap();
sent += transmit.contents.len();
b.to_async(Runtime::new().unwrap()).iter(
|| async {
let send = tokio::net::UdpSocket::from_std(send.try_clone().unwrap()).unwrap();
let recv = tokio::net::UdpSocket::from_std(recv.try_clone().unwrap()).unwrap();

let gro_segments = min(32, if gro_enabled { recv_state.gro_segments() } else { 1 });
let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 };
let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * dbg!(gro_segments) ]; dbg!(batch_size)];
println!("{}", receive_buffers[0].len());
let mut receive_slices = receive_buffers
.iter_mut()
.map(|buf| IoSliceMut::new(buf))
.collect::<Vec<_>>();
let mut meta = vec![RecvMeta::default(); batch_size];

let mut sent: usize = 0;
let mut received: usize = 0;
while dbg!(sent) < TOTAL_BYTES {
send.writable().await.unwrap();
send.try_io(Interest::WRITABLE, || {
send_state.send((&send).into(), &transmit)
}).unwrap();
sent += transmit.contents.len();

let mut received_segments = 0;
while received_segments < segments {
let n = recv_state
.recv((&recv).into(), &mut receive_slices, &mut meta)
.unwrap();
for i in meta.iter().take(n) {
received_segments += i.len / i.stride;
while dbg!(received) < dbg!(sent){
recv.readable().await.unwrap();
let n = match recv.try_io(Interest::READABLE, || {
recv_state
.recv((&recv).into(), &mut receive_slices, &mut meta)
}) {
Ok(n) => n,
// false positive.
Err(e) if e.kind() == ErrorKind::WouldBlock => {println!("continue"); continue},
e => e.unwrap(),
};
for i in meta.iter().take(dbg!(n)) {
received += i.len;
}
}
}
assert_eq!(received_segments, segments);
}
})
},
)
});
}
}
Expand Down