|
1 | 1 | use criterion::{criterion_group, criterion_main, Criterion};
|
2 |
| -use quinn_udp::{RecvMeta, Transmit, UdpSocketState}; |
3 |
| -use std::cmp::min; |
4 |
| -use std::net::{Ipv4Addr, Ipv6Addr}; |
5 |
| -use std::{io::IoSliceMut, net::UdpSocket, slice}; |
| 2 | +use quinn_udp::{RecvMeta, Transmit, UdpSocketState, BATCH_SIZE}; |
| 3 | +use std::{ |
| 4 | + cmp::min, |
| 5 | + io::{ErrorKind, IoSliceMut}, |
| 6 | + net::{Ipv4Addr, Ipv6Addr, UdpSocket}, |
| 7 | +}; |
| 8 | +use tokio::io::Interest; |
| 9 | +use tokio::runtime::Runtime; |
| 10 | + |
| 11 | +const MAX_IP_UDP_HEADER_SIZE: usize = 48; |
| 12 | +const MAX_DATAGRAM_SIZE: usize = u16::MAX as usize - MAX_IP_UDP_HEADER_SIZE; |
6 | 13 |
|
7 | 14 | pub fn criterion_benchmark(c: &mut Criterion) {
|
8 | 15 | const TOTAL_BYTES: usize = 10 * 1024 * 1024;
|
9 |
| - // Maximum GSO buffer size is 64k. |
10 |
| - const MAX_BUFFER_SIZE: usize = u16::MAX as usize; |
11 | 16 | const SEGMENT_SIZE: usize = 1280;
|
12 | 17 |
|
13 |
| - let send = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) |
14 |
| - .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) |
15 |
| - .unwrap(); |
16 |
| - let recv = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) |
17 |
| - .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) |
18 |
| - .unwrap(); |
19 |
| - let max_segments = min( |
20 |
| - UdpSocketState::new((&send).into()) |
21 |
| - .unwrap() |
22 |
| - .max_gso_segments(), |
23 |
| - MAX_BUFFER_SIZE / SEGMENT_SIZE, |
24 |
| - ); |
25 |
| - let dst_addr = recv.local_addr().unwrap(); |
26 |
| - let send_state = UdpSocketState::new((&send).into()).unwrap(); |
27 |
| - let recv_state = UdpSocketState::new((&recv).into()).unwrap(); |
28 |
| - // Reverse non-blocking flag set by `UdpSocketState` to make the test non-racy |
29 |
| - recv.set_nonblocking(false).unwrap(); |
| 18 | + let mut rt = Runtime::new().unwrap(); |
| 19 | + let (send_socket, send_state) = new_socket(&mut rt); |
| 20 | + let (recv_socket, recv_state) = new_socket(&mut rt); |
| 21 | + let dst_addr = recv_socket.local_addr().unwrap(); |
30 | 22 |
|
31 |
| - let mut receive_buffer = vec![0; MAX_BUFFER_SIZE]; |
32 |
| - let mut meta = RecvMeta::default(); |
| 23 | + let mut permutations = vec![]; |
| 24 | + for gso_enabled in [ |
| 25 | + false, |
| 26 | + #[cfg(any(target_os = "linux", target_os = "windows"))] |
| 27 | + true, |
| 28 | + ] { |
| 29 | + for gro_enabled in [false, true] { |
| 30 | + #[cfg(target_os = "windows")] |
| 31 | + if gso_enabled && !gro_enabled { |
| 32 | + // Windows requires receive buffer to fit entire datagram on GRO |
| 33 | + // enabled socket. |
| 34 | + // |
| 35 | + // OS error: "A message sent on a datagram socket was larger |
| 36 | + // than the internal message buffer or some other network limit, |
| 37 | + // or the buffer used to receive a datagram into was smaller |
| 38 | + // than the datagram itself." |
| 39 | + continue; |
| 40 | + } |
33 | 41 |
|
34 |
| - for gso_enabled in [false, true] { |
35 |
| - let mut group = c.benchmark_group(format!("gso_{}", gso_enabled)); |
36 |
| - group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); |
| 42 | + for recvmmsg_enabled in [false, true] { |
| 43 | + permutations.push((gso_enabled, gro_enabled, recvmmsg_enabled)); |
| 44 | + } |
| 45 | + } |
| 46 | + } |
37 | 47 |
|
38 |
| - let segments = if gso_enabled { max_segments } else { 1 }; |
39 |
| - let msg = vec![0xAB; SEGMENT_SIZE * segments]; |
| 48 | + for (gso_enabled, gro_enabled, recvmmsg_enabled) in permutations { |
| 49 | + let mut group = c.benchmark_group(format!( |
| 50 | + "gso_{}_gro_{}_recvmmsg_{}", |
| 51 | + gso_enabled, gro_enabled, recvmmsg_enabled |
| 52 | + )); |
| 53 | + group.throughput(criterion::Throughput::Bytes(TOTAL_BYTES as u64)); |
40 | 54 |
|
| 55 | + let gso_segments = gso_enabled |
| 56 | + .then_some(send_state.max_gso_segments()) |
| 57 | + .unwrap_or(1); |
| 58 | + let msg = vec![0xAB; min(MAX_DATAGRAM_SIZE, SEGMENT_SIZE * gso_segments)]; |
41 | 59 | let transmit = Transmit {
|
42 | 60 | destination: dst_addr,
|
43 | 61 | ecn: None,
|
44 | 62 | contents: &msg,
|
45 | 63 | segment_size: gso_enabled.then_some(SEGMENT_SIZE),
|
46 | 64 | src_ip: None,
|
47 | 65 | };
|
| 66 | + let gro_segments = gro_enabled |
| 67 | + .then_some(recv_state.gro_segments()) |
| 68 | + .unwrap_or(1); |
| 69 | + let batch_size = if recvmmsg_enabled { BATCH_SIZE } else { 1 }; |
48 | 70 |
|
49 | 71 | group.bench_function("throughput", |b| {
|
50 |
| - b.iter(|| { |
| 72 | + b.to_async(Runtime::new().unwrap()).iter(|| async { |
| 73 | + let mut receive_buffers = vec![vec![0; SEGMENT_SIZE * gro_segments]; batch_size]; |
| 74 | + let mut receive_slices = receive_buffers |
| 75 | + .iter_mut() |
| 76 | + .map(|buf| IoSliceMut::new(buf)) |
| 77 | + .collect::<Vec<_>>(); |
| 78 | + let mut meta = vec![RecvMeta::default(); batch_size]; |
| 79 | + |
51 | 80 | let mut sent: usize = 0;
|
| 81 | + let mut received: usize = 0; |
52 | 82 | while sent < TOTAL_BYTES {
|
53 |
| - send_state.send((&send).into(), &transmit).unwrap(); |
| 83 | + send_socket.writable().await.unwrap(); |
| 84 | + send_socket |
| 85 | + .try_io(Interest::WRITABLE, || { |
| 86 | + send_state.send((&send_socket).into(), &transmit) |
| 87 | + }) |
| 88 | + .unwrap(); |
54 | 89 | sent += transmit.contents.len();
|
55 | 90 |
|
56 |
| - let mut received_segments = 0; |
57 |
| - while received_segments < segments { |
58 |
| - let n = recv_state |
59 |
| - .recv( |
60 |
| - (&recv).into(), |
61 |
| - &mut [IoSliceMut::new(&mut receive_buffer)], |
62 |
| - slice::from_mut(&mut meta), |
63 |
| - ) |
64 |
| - .unwrap(); |
65 |
| - assert_eq!(n, 1); |
66 |
| - received_segments += meta.len / meta.stride; |
| 91 | + while received < sent { |
| 92 | + recv_socket.readable().await.unwrap(); |
| 93 | + let n = match recv_socket.try_io(Interest::READABLE, || { |
| 94 | + recv_state.recv((&recv_socket).into(), &mut receive_slices, &mut meta) |
| 95 | + }) { |
| 96 | + Ok(n) => n, |
| 97 | + // recv.readable() can lead to false positives. Try again. |
| 98 | + Err(e) if e.kind() == ErrorKind::WouldBlock => continue, |
| 99 | + e => e.unwrap(), |
| 100 | + }; |
| 101 | + received += meta.iter().map(|m| m.len).take(n).sum::<usize>(); |
67 | 102 | }
|
68 |
| - assert_eq!(received_segments, segments); |
69 | 103 | }
|
70 | 104 | })
|
71 | 105 | });
|
72 | 106 | }
|
73 | 107 | }
|
74 | 108 |
|
| 109 | +fn new_socket(rt: &mut Runtime) -> (tokio::net::UdpSocket, UdpSocketState) { |
| 110 | + let socket = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)) |
| 111 | + .or_else(|_| UdpSocket::bind((Ipv4Addr::LOCALHOST, 0))) |
| 112 | + .unwrap(); |
| 113 | + |
| 114 | + let state = UdpSocketState::new((&socket).into()).unwrap(); |
| 115 | + let socket = rt.block_on(async { tokio::net::UdpSocket::from_std(socket).unwrap() }); |
| 116 | + (socket, state) |
| 117 | +} |
| 118 | + |
75 | 119 | criterion_group!(benches, criterion_benchmark);
|
76 | 120 | criterion_main!(benches);
|
0 commit comments