Skip to content

bench(udp): run GSO, GRO and recvmmsg permutations #2010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ jobs:
$HOME/.cargo/bin/rustc --version
echo "~~~~ freebsd-version ~~~~"
freebsd-version
run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml
run: $HOME/.cargo/bin/cargo build --all-targets && $HOME/.cargo/bin/cargo test && $HOME/.cargo/bin/cargo test --manifest-path fuzz/Cargo.toml && $HOME/.cargo/bin/cargo test -p quinn-udp --benches
test:
strategy:
matrix:
Expand All @@ -54,6 +54,7 @@ jobs:
- run: cargo test
- run: cargo test --manifest-path fuzz/Cargo.toml
if: ${{ matrix.rust }} == "stable"
- run: cargo test -p quinn-udp --benches

test-aws-lc-rs:
runs-on: ubuntu-latest
Expand Down
9 changes: 7 additions & 2 deletions quinn-udp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,13 @@ once_cell = { workspace = true }
windows-sys = { workspace = true }

[dev-dependencies]
criterion = "0.5"
criterion = { version = "0.5", default-features = false, features = ["async_tokio"] }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "net"] }

[target.'cfg(any(target_os = "linux", target_os = "windows"))'.bench]
[lib]
# See https://github.com/bheisler/criterion.rs/blob/master/book/src/faq.md#cargo-bench-gives-unrecognized-option-errors-for-valid-command-line-options
bench = false

[[bench]]
name = "throughput"
harness = false
144 changes: 98 additions & 46 deletions quinn-udp/benches/throughput.rs
Original file line number Diff line number Diff line change
@@ -1,76 +1,128 @@
use std::{
cmp::min,
io::{ErrorKind, IoSliceMut},
net::{Ipv4Addr, Ipv6Addr, UdpSocket},
};

use criterion::{criterion_group, criterion_main, Criterion};
use quinn_udp::{RecvMeta, Transmit, UdpSocketState};
use std::cmp::min;
use std::net::{Ipv4Addr, Ipv6Addr};
use std::{io::IoSliceMut, net::UdpSocket, slice};
use tokio::{io::Interest, runtime::Runtime};

use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE};

pub fn criterion_benchmark(c: &mut Criterion) {
const TOTAL_BYTES: usize = 10 * 1024 * 1024;
// Maximum GSO buffer size is 64k.
const MAX_BUFFER_SIZE: usize = u16::MAX as usize;
const SEGMENT_SIZE: usize = 1280;

let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();
let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();
let max_segments = min(
UdpSocketState::new((&send).into())
.unwrap()
.max_gso_segments(),
MAX_BUFFER_SIZE / SEGMENT_SIZE,
);
let dst_addr = recv.local_addr().unwrap();
let send_state = UdpSocketState::new((&send).into()).unwrap();
let recv_state = UdpSocketState::new((&recv).into()).unwrap();
// Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy
recv.set_nonblocking(false).unwrap();

let mut receive_buffer = vec![0; MAX_BUFFER_SIZE];
let mut meta = RecvMeta::default();

for gso_enabled in [false, true] {
let mut group = c.benchmark_group(format!("gso_{}", gso_enabled));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));
let rt = Runtime::new().unwrap();
let _guard = rt.enter();

let (send_state, send_socket) = new_socket();
let (recv_state, recv_socket) = new_socket();
let dst_addr = recv_socket.local_addr().unwrap();

let segments = if gso_enabled { max_segments } else { 1 };
let msg = vec![0xAB; SEGMENT_SIZE * segments];
let mut permutations = vec![];
for gso_enabled in [
false,
#[cfg(any(target_os = "linux", target_os = "windows"))]
true,
] {
for gro_enabled in [false, true] {
#[cfg(target_os = "windows")]
if gso_enabled && !gro_enabled {
// Windows requires receive buffer to fit entire datagram on GRO
// enabled socket.
//
// OS error: "A message sent on a datagram socket was larger
// than the internal message buffer or some other network limit,
// or the buffer used to receive a datagram into was smaller
// than the datagram itself."
continue;
}

for recvmmsg_enabled in [false, true] {
permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled));
}
}
}

for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations {
let mut group = c.benchmark_group(format!(
"gso_{}_gro_{}_recvmmsg_{}",
gso_enabled, gro_enabled, recvmmsg_enabled
));
group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64));

let gso_segments = if gso_enabled {
send_state.max_gso_segments()
} else {
1
};
let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)];
let transmit = Transmit {
destination: dst_addr,
ecn: None,
contents: &msg,
segment_size: gso_enabled.then_some(SEGMENT_SIZE),
src_ip: None,
};
let gro_segments = if gro_enabled {
recv_state.gro_segments()
} else {
1
};
let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 };

group.bench_function("throughput", |b| {
b.iter(|| {
b.to_async(&rt).iter(|| async {
let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size];
let mut receive_slices = receive_buffers
.iter_mut()
.map(|buf| IoSliceMut::new(buf))
.collect::<Vec<_>>();
let mut meta = vec![RecvMeta::default(); batch_size];

let mut sent: usize = 0;
let mut received: usize = 0;
while sent < TOTAL_BYTES {
send_state.send((&send).into(), &transmit).unwrap();
send_socket.writable().await.unwrap();
send_socket
.try_io(Interest::WRITABLE, || {
send_state.send((&send_socket).into(), &transmit)
})
.unwrap();
sent += transmit.contents.len();

let mut received_segments = 0;
while received_segments < segments {
let n = recv_state
.recv(
(&recv).into(),
&mut [IoSliceMut::new(&mut receive_buffer)],
slice::from_mut(&mut meta),
)
.unwrap();
assert_eq!(n, 1);
received_segments += meta.len / meta.stride;
while received < sent {
recv_socket.readable().await.unwrap();
let n = match recv_socket.try_io(Interest::READABLE, || {
recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta)
}) {
Ok(n) => n,
// recv.readable() can lead to false positives. Try again.
Err(e) if e.kind() == ErrorKind::WouldBlock => continue,
e => e.unwrap(),
};
received += meta.iter().map(|m| m.len).take(n).sum::<usize>();
}
assert_eq!(received_segments, segments);
}
})
});
}
}

fn new_socket() -> (UdpSocketState, tokio::net::UdpSocket) {
let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0))
.or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0)))
.unwrap();

(
UdpSocketState::new((&socket).into()).unwrap(),
tokio::net::UdpSocket::from_std(socket).unwrap(),
)
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

const MAX_IP_UDP_HEADER_SIZE: usize = 48;
const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE;