Skip to content

Commit

Permalink
refactor(udp/bench): switch to async
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Oct 20, 2024
1 parent c517254 commit 5609a1d
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 40 deletions.
9 changes: 7 additions & 2 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ once_cell = { workspace = true }
windows-sys = { workspace = true }

[dev-dependencies]
criterion = "0.5"
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net"] }

[target.'cfg(any(target_os = "linux", target_os = "windows"))'.bench]
[lib]
# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options
bench = false

[[bench]]
name = "throughput"
harness = false
117 changes: 79 additions & 38 deletions quinn-udp/benches/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,79 +1,120 @@
use std::{
cmp::min,
io::{ErrorKind, IoSliceMut},
net::{Ipv4Addr, Ipv6Addr, UdpSocket},
};

use criterion::{criterion_group, criterion_main, Criterion};
use tokio::{io::Interest, runtime::Runtime};

use quinn_udp::{RecvMeta, Transmit, UdpSocketState};
use std::cmp::min;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::{io::IoSliceMut, net::UdpSocket, slice};

pub fn criterion_benchmark(c: &mut Criterion) {
const TOTAL_BYTES: usize = 10 * 1024 * 1024;
// Maximum GSO buffer size is 64k.
const MAX_BUFFER_SIZE: usize = u16::MAX as usize;
const SEGMENT_SIZE: usize = 1280;

let (send_socket, send_state) = new_socket();
let (recv_socket, recv_state) = new_socket();
// Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy
recv_socket.set_nonblocking(false).unwrap();

let max_segments = min(
UdpSocketState::new((&send_socket).into())
.unwrap()
.max_gso_segments(),
MAX_BUFFER_SIZE / SEGMENT_SIZE,
);
let mut rt = Runtime::new().unwrap();
let (send_socket, send_state) = new_socket(&mut rt);
let (recv_socket, recv_state) = new_socket(&mut rt);
let dst_addr = recv_socket.local_addr().unwrap();

let mut receive_buffer = vec![0; MAX_BUFFER_SIZE];
let mut meta = RecvMeta::default();
let mut permutations = vec![];
for gso_enabled in [
false,
#[cfg(any(target_os = "linux", target_os = "windows"))]
true,
] {
for gro_enabled in [false, true] {
#[cfg(target_os = "windows")]
if gso_enabled && !gro_enabled {
// Windows requires receive buffer to fit entire datagram on GRO
// enabled socket.
//
// OS error: "A message sent on a datagram socket was larger
// than the internal message buffer or some other network limit,
// or the buffer used to receive a datagram into was smaller
// than the datagram itself."
continue;
}

for gso_enabled in [false, true] {
let mut group = c.benchmark_group(format!("gso_{}", gso_enabled));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));
permutations.push((gso_enabled, gro_enabled));
}
}

let segments = if gso_enabled { max_segments } else { 1 };
let msg = vec![0xAB; SEGMENT_SIZE * segments];
for (gso_enabled, gro_enabled) in permutations {
let mut group = c.benchmark_group(format!("gso_{}_gro_{}", gso_enabled, gro_enabled));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));

let gso_segments = if gso_enabled {
send_state.max_gso_segments()
} else {
1
};
let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)];
let transmit = Transmit {
destination: dst_addr,
ecn: None,
contents: &msg,
segment_size: gso_enabled.then_some(SEGMENT_SIZE),
src_ip: None,
};
let gro_segments = if gro_enabled {
recv_state.gro_segments()
} else {
1
};
let batch_size = 1;

group.bench_function("throughput", |b| {
b.iter(|| {
b.to_async(Runtime::new().unwrap()).iter(|| async {
let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size];
let mut receive_slices = receive_buffers
.iter_mut()
.map(|buf| IoSliceMut::new(buf))
.collect::<Vec<_>>();
let mut meta = vec![RecvMeta::default(); batch_size];

let mut sent: usize = 0;
let mut received: usize = 0;
while sent < TOTAL_BYTES {
send_state.send((&send_socket).into(), &transmit).unwrap();
send_socket.writable().await.unwrap();
send_socket
.try_io(Interest::WRITABLE, || {
send_state.send((&send_socket).into(), &transmit)
})
.unwrap();
sent += transmit.contents.len();

let mut received_segments = 0;
while received_segments < segments {
let n = recv_state
.recv(
(&recv_socket).into(),
&mut [IoSliceMut::new(&mut receive_buffer)],
slice::from_mut(&mut meta),
)
.unwrap();
assert_eq!(n, 1);
received_segments += meta.len / meta.stride;
while received < sent {
recv_socket.readable().await.unwrap();
let n = match recv_socket.try_io(Interest::READABLE, || {
recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta)
}) {
Ok(n) => n,
// recv.readable() can lead to false positives. Try again.
Err(e) if e.kind() == ErrorKind::WouldBlock => continue,
e => e.unwrap(),
};
received += meta.iter().map(|m| m.len).take(n).sum::<usize>();
}
assert_eq!(received_segments, segments);
}
})
});
}
}

fn new_socket() -> (UdpSocket, UdpSocketState) {
fn new_socket(rt: &mut Runtime) -> (tokio::net::UdpSocket, UdpSocketState) {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();

let state = UdpSocketState::new((&socket).into()).unwrap();
let socket = rt.block_on(async { tokio::net::UdpSocket::from_std(socket).unwrap() });
(socket, state)
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

const MAX_IP_UDP_HEADER_SIZE: usize = 48;
const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE;

0 comments on commit 5609a1d

Please sign in to comment.