Skip to content

Commit 190f698

Browse files
committed
bench(udp): run GSO, GRO and recvmmsg permutations
1 parent 15a4dce commit 190f698

File tree

3 files changed

+102
-47
lines changed

3 files changed

+102
-47
lines changed

.github/workflows/rust.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ jobs:
3030
$HOME/.cargo/bin/rustc --version
3131
echo "~~~~ freebsd-version ~~~~"
3232
freebsd-version
33-
run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml
33+
run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml && $HOME/.cargo/bin/cargo test --benches
3434
test:
3535
strategy:
3636
matrix:
@@ -54,6 +54,7 @@ jobs:
5454
- run: cargo test
5555
- run: cargo test --manifest-path fuzz/Cargo.toml
5656
if: ${{ matrix.rust }} == "stable"
57+
- run: cargo test --benches
5758

5859
test-aws-lc-rs:
5960
runs-on: ubuntu-latest

quinn-udp/Cargo.toml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,13 @@ once_cell = { workspace = true }
3030
windows-sys = { workspace = true }
3131

3232
[dev-dependencies]
33-
criterion = "0.5"
33+
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
34+
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net"] }
3435

35-
[target.'cfg(any(target_os = "linux", target_os = "windows"))'.bench]
36+
[lib]
37+
# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options
38+
bench = false
39+
40+
[[bench]]
3641
name = "throughput"
3742
harness = false

quinn-udp/benches/throughput.rs

Lines changed: 93 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,125 @@
1+
use std::{
2+
cmp::min,
3+
io::{ErrorKind, IoSliceMut},
4+
net::{Ipv4Addr, Ipv6Addr, UdpSocket},
5+
};
6+
17
use criterion::{criterion_group, criterion_main, Criterion};
2-
use quinn_udp::{RecvMeta, Transmit, UdpSocketState};
3-
use std::cmp::min;
4-
use std::net::{Ipv4Addr, Ipv6Addr};
5-
use std::{io::IoSliceMut, net::UdpSocket, slice};
8+
use tokio::{io::Interest, runtime::Runtime};
9+
10+
use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE};
611

712
pub fn criterion_benchmark(c: &mut Criterion) {
813
const TOTAL_BYTES: usize = 10 * 1024 * 1024;
9-
// Maximum GSO buffer size is 64k.
10-
const MAX_BUFFER_SIZE: usize = u16::MAX as usize;
1114
const SEGMENT_SIZE: usize = 1280;
1215

13-
let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
14-
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
15-
.unwrap();
16-
let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
17-
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
18-
.unwrap();
19-
let max_segments = min(
20-
UdpSocketState::new((&send).into())
21-
.unwrap()
22-
.max_gso_segments(),
23-
MAX_BUFFER_SIZE / SEGMENT_SIZE,
24-
);
25-
let dst_addr = recv.local_addr().unwrap();
26-
let send_state = UdpSocketState::new((&send).into()).unwrap();
27-
let recv_state = UdpSocketState::new((&recv).into()).unwrap();
28-
// Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy
29-
recv.set_nonblocking(false).unwrap();
16+
let mut rt = Runtime::new().unwrap();
17+
let (send_socket, send_state) = new_socket(&mut rt);
18+
let (recv_socket, recv_state) = new_socket(&mut rt);
19+
let dst_addr = recv_socket.local_addr().unwrap();
3020

31-
let mut receive_buffer = vec![0; MAX_BUFFER_SIZE];
32-
let mut meta = RecvMeta::default();
21+
let mut permutations = vec![];
22+
for gso_enabled in [
23+
false,
24+
#[cfg(any(target_os = "linux", target_os = "windows"))]
25+
true,
26+
] {
27+
for gro_enabled in [false, true] {
28+
#[cfg(target_os = "windows")]
29+
if gso_enabled && !gro_enabled {
30+
// Windows requires receive buffer to fit entire datagram on GRO
31+
// enabled socket.
32+
//
33+
// OS error: "A message sent on a datagram socket was larger
34+
// than the internal message buffer or some other network limit,
35+
// or the buffer used to receive a datagram into was smaller
36+
// than the datagram itself."
37+
continue;
38+
}
3339

34-
for gso_enabled in [false, true] {
35-
let mut group = c.benchmark_group(format!("gso_{}", gso_enabled));
36-
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));
40+
for recvmmsg_enabled in [false, true] {
41+
permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled));
42+
}
43+
}
44+
}
3745

38-
let segments = if gso_enabled { max_segments } else { 1 };
39-
let msg = vec![0xAB; SEGMENT_SIZE * segments];
46+
for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations {
47+
let mut group = c.benchmark_group(format!(
48+
"gso_{}_gro_{}_recvmmsg_{}",
49+
gso_enabled, gro_enabled, recvmmsg_enabled
50+
));
51+
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));
4052

53+
let gso_segments = if gso_enabled {
54+
send_state.max_gso_segments()
55+
} else {
56+
1
57+
};
58+
let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)];
4159
let transmit = Transmit {
4260
destination: dst_addr,
4361
ecn: None,
4462
contents: &msg,
4563
segment_size: gso_enabled.then_some(SEGMENT_SIZE),
4664
src_ip: None,
4765
};
66+
let gro_segments = if gro_enabled {
67+
recv_state.gro_segments()
68+
} else {
69+
1
70+
};
71+
let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 };
4872

4973
group.bench_function("throughput", |b| {
50-
b.iter(|| {
74+
b.to_async(Runtime::new().unwrap()).iter(|| async {
75+
let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size];
76+
let mut receive_slices = receive_buffers
77+
.iter_mut()
78+
.map(|buf| IoSliceMut::new(buf))
79+
.collect::<Vec<_>>();
80+
let mut meta = vec![RecvMeta::default(); batch_size];
81+
5182
let mut sent: usize = 0;
83+
let mut received: usize = 0;
5284
while sent < TOTAL_BYTES {
53-
send_state.send((&send).into(), &transmit).unwrap();
85+
send_socket.writable().await.unwrap();
86+
send_socket
87+
.try_io(Interest::WRITABLE, || {
88+
send_state.send((&send_socket).into(), &transmit)
89+
})
90+
.unwrap();
5491
sent += transmit.contents.len();
5592

56-
let mut received_segments = 0;
57-
while received_segments < segments {
58-
let n = recv_state
59-
.recv(
60-
(&recv).into(),
61-
&mut [IoSliceMut::new(&mut receive_buffer)],
62-
slice::from_mut(&mut meta),
63-
)
64-
.unwrap();
65-
assert_eq!(n, 1);
66-
received_segments += meta.len / meta.stride;
93+
while received < sent {
94+
recv_socket.readable().await.unwrap();
95+
let n = match recv_socket.try_io(Interest::READABLE, || {
96+
recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta)
97+
}) {
98+
Ok(n) => n,
99+
// recv.readable() can lead to false positives. Try again.
100+
Err(e) if e.kind() == ErrorKind::WouldBlock => continue,
101+
e => e.unwrap(),
102+
};
103+
received += meta.iter().map(|m| m.len).take(n).sum::<usize>();
67104
}
68-
assert_eq!(received_segments, segments);
69105
}
70106
})
71107
});
72108
}
73109
}
74110

111+
fn new_socket(rt: &mut Runtime) -> (tokio::net::UdpSocket, UdpSocketState) {
112+
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
113+
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
114+
.unwrap();
115+
116+
let state = UdpSocketState::new((&socket).into()).unwrap();
117+
let socket = rt.block_on(async { tokio::net::UdpSocket::from_std(socket).unwrap() });
118+
(socket, state)
119+
}
120+
75121
criterion_group!(benches, criterion_benchmark);
76122
criterion_main!(benches);
123+
124+
const MAX_IP_UDP_HEADER_SIZE: usize = 48;
125+
const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE;

0 commit comments

Comments
 (0)