Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: QUIC #282

Merged
merged 28 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
07124ee
feat(net): add `get_socket_option` on `Socket`
AsakuraMizu Jul 31, 2024
c4e9db5
feat(quic): basic endpoint & connection
AsakuraMizu Jul 31, 2024
110011b
fix(quic): blocking
AsakuraMizu Aug 5, 2024
d84891a
feat(quic): datagram
AsakuraMizu Aug 5, 2024
e466fc0
feat(quic): stream
AsakuraMizu Aug 8, 2024
a24f248
feat(quic): 0-rtt
AsakuraMizu Aug 8, 2024
9a58d2f
feat(quic): redesign builders
AsakuraMizu Aug 8, 2024
29ed9f9
test(quic): port from quinn
AsakuraMizu Aug 8, 2024
efa7323
fix(driver,iocp): fix incorrect opcode impl
AsakuraMizu Aug 8, 2024
7fb5067
fix(quic): windows specific bug
AsakuraMizu Aug 9, 2024
afb0539
chore(quic): interaction with tracing
AsakuraMizu Aug 9, 2024
83b928b
feat(quic): add bench
AsakuraMizu Aug 15, 2024
aae6db4
feat(quic): improved close logic
AsakuraMizu Aug 15, 2024
c3cc570
Merge branch 'master' into quic
AsakuraMizu Aug 17, 2024
13fa978
fix(net): mark get/set_socket_option as unsafe
AsakuraMizu Aug 17, 2024
c329a7c
test(net): remove redundant send_msg test
AsakuraMizu Aug 17, 2024
32e07e0
test(quic): ignore echo_dualstack on unsupported platforms
AsakuraMizu Aug 19, 2024
20a3683
feat(quic): remove event-listener
AsakuraMizu Aug 20, 2024
e99bc8d
feat(quic): http3
AsakuraMizu Aug 20, 2024
56bcf29
fix(quic): apply suggestions from code review
AsakuraMizu Aug 24, 2024
aa0ec20
fix(driver, iocp): remove unnecessary field
AsakuraMizu Aug 24, 2024
514b9b7
chore(quic): rustls provider
AsakuraMizu Aug 24, 2024
6590833
bench(quic): various size
AsakuraMizu Aug 24, 2024
fc23e2a
Merge branch 'compio-rs:master' into quic
AsakuraMizu Aug 26, 2024
77f1e23
chore: extract common deps into workspace
AsakuraMizu Aug 26, 2024
b1ed765
chore(quic): simplify check_0rtt
AsakuraMizu Aug 27, 2024
148f3ac
chore(net): add type constraint for get/set_socket_option
AsakuraMizu Aug 27, 2024
ee92ffd
perf(quic): batch recv on channel
AsakuraMizu Aug 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"compio-tls",
"compio-log",
"compio-process",
"compio-quic",
]
resolver = "2"

Expand All @@ -36,7 +37,9 @@ compio-dispatcher = { path = "./compio-dispatcher", version = "0.3.0" }
compio-log = { path = "./compio-log", version = "0.1.0" }
compio-tls = { path = "./compio-tls", version = "0.2.0", default-features = false }
compio-process = { path = "./compio-process", version = "0.1.0" }
compio-quic = { path = "./compio-quic", version = "0.1.0" }

bytes = "1.7.1"
flume = "0.11.0"
cfg-if = "1.0.0"
criterion = "0.5.1"
Expand All @@ -49,10 +52,13 @@ nix = "0.29.0"
once_cell = "1.18.0"
os_pipe = "1.1.4"
paste = "1.0.14"
rand = "0.8.5"
rustls = { version = "0.23.1", default-features = false }
slab = "0.4.9"
socket2 = "0.5.6"
tempfile = "3.8.1"
tokio = "1.33.0"
tracing-subscriber = "0.3.18"
widestring = "1.0.2"
windows-sys = "0.52.0"

Expand Down
2 changes: 1 addition & 1 deletion compio-buf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
[dependencies]
bumpalo = { version = "3.14.0", optional = true }
arrayvec = { version = "0.7.4", optional = true }
bytes = { version = "1.5.0", optional = true }
bytes = { workspace = true, optional = true }

[target.'cfg(unix)'.dependencies]
libc = { workspace = true }
Expand Down
29 changes: 12 additions & 17 deletions compio-driver/src/iocp/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -781,12 +781,11 @@ static WSA_RECVMSG: OnceLock<LPFN_WSARECVMSG> = OnceLock::new();

/// Receive data and source address with ancillary data into vectored buffer.
pub struct RecvMsg<T: IoVectoredBufMut, C: IoBufMut, S> {
msg: WSAMSG,
addr: SOCKADDR_STORAGE,
addr_len: socklen_t,
fd: SharedFd<S>,
buffer: T,
control: C,
control_len: u32,
_p: PhantomPinned,
}

Expand All @@ -802,12 +801,11 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> RecvMsg<T, C, S> {
"misaligned control message buffer"
);
Self {
msg: unsafe { std::mem::zeroed() },
addr: unsafe { std::mem::zeroed() },
addr_len: std::mem::size_of::<SOCKADDR_STORAGE>() as _,
fd,
buffer,
control,
control_len: 0,
_p: PhantomPinned,
}
}
Expand All @@ -820,8 +818,8 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S> IntoInner for RecvMsg<T, C, S> {
(
(self.buffer, self.control),
self.addr,
self.addr_len,
self.control_len as _,
self.msg.namelen,
self.msg.Control.len as _,
)
}
}
Expand All @@ -835,26 +833,23 @@ impl<T: IoVectoredBufMut, C: IoBufMut, S: AsRawFd> OpCode for RecvMsg<T, C, S> {
})?;

let this = self.get_unchecked_mut();

let mut slices = this.buffer.io_slices_mut();
let mut msg = WSAMSG {
name: &mut this.addr as *mut _ as _,
namelen: this.addr_len,
lpBuffers: slices.as_mut_ptr() as _,
dwBufferCount: slices.len() as _,
Control: std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut()),
dwFlags: 0,
};
this.control_len = 0;
this.msg.name = &mut this.addr as *mut _ as _;
this.msg.namelen = std::mem::size_of::<SOCKADDR_STORAGE>() as _;
this.msg.lpBuffers = slices.as_mut_ptr() as _;
this.msg.dwBufferCount = slices.len() as _;
this.msg.Control =
std::mem::transmute::<IoSliceMut, WSABUF>(this.control.as_io_slice_mut());

let mut received = 0;
let res = recvmsg_fn(
this.fd.as_raw_fd() as _,
&mut msg,
&mut this.msg,
&mut received,
optr,
None,
);
this.control_len = msg.Control.len;
winsock_result(res, received)
}

Expand Down
2 changes: 1 addition & 1 deletion compio-log/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ repository = { workspace = true }
tracing = { version = "0.1", default-features = false }

[dev-dependencies]
tracing-subscriber = "0.3"
tracing-subscriber = { workspace = true }

[features]
enable_log = []
59 changes: 56 additions & 3 deletions compio-net/src/socket.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use std::{future::Future, io, mem::ManuallyDrop};
use std::{
future::Future,
io,
mem::{ManuallyDrop, MaybeUninit},
};

use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
#[cfg(unix)]
Expand Down Expand Up @@ -320,7 +324,51 @@ impl Socket {
}

#[cfg(unix)]
pub fn set_socket_option<T>(&self, level: i32, name: i32, value: &T) -> io::Result<()> {
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
let mut len = size_of::<T>() as libc::socklen_t;
syscall!(libc::getsockopt(
self.socket.as_raw_fd(),
level,
name,
value.as_mut_ptr() as _,
&mut len
))
.map(|_| {
debug_assert_eq!(len as usize, size_of::<T>());
// SAFETY: The value is initialized by `getsockopt`.
value.assume_init()
})
}

#[cfg(windows)]
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
let mut value: MaybeUninit<T> = MaybeUninit::uninit();
let mut len = size_of::<T>() as i32;
syscall!(
SOCKET,
windows_sys::Win32::Networking::WinSock::getsockopt(
self.socket.as_raw_fd() as _,
level,
name,
value.as_mut_ptr() as _,
&mut len
)
)
.map(|_| {
debug_assert_eq!(len as usize, size_of::<T>());
// SAFETY: The value is initialized by `getsockopt`.
value.assume_init()
})
}

#[cfg(unix)]
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
syscall!(libc::setsockopt(
self.socket.as_raw_fd(),
level,
Expand All @@ -332,7 +380,12 @@ impl Socket {
}

#[cfg(windows)]
pub fn set_socket_option<T>(&self, level: i32, name: i32, value: &T) -> io::Result<()> {
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
syscall!(
SOCKET,
windows_sys::Win32::Networking::WinSock::setsockopt(
Expand Down
20 changes: 19 additions & 1 deletion compio-net/src/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,26 @@ impl UdpSocket {
.await
}

/// Gets a socket option.
///
/// # Safety
///
/// The caller must ensure `T` is the correct type for `level` and `name`.
pub unsafe fn get_socket_option<T: Copy>(&self, level: i32, name: i32) -> io::Result<T> {
self.inner.get_socket_option(level, name)
}

/// Sets a socket option.
pub fn set_socket_option<T>(&self, level: i32, name: i32, value: &T) -> io::Result<()> {
///
/// # Safety
///
/// The caller must ensure `T` is the correct type for `level` and `name`.
pub unsafe fn set_socket_option<T: Copy>(
&self,
level: i32,
name: i32,
value: &T,
) -> io::Result<()> {
self.inner.set_socket_option(level, name, value)
}
}
Expand Down
56 changes: 1 addition & 55 deletions compio-net/tests/udp.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use compio_net::{CMsgBuilder, CMsgIter, UdpSocket};
use compio_net::UdpSocket;

#[compio_macros::test]
async fn connect() {
Expand Down Expand Up @@ -64,57 +64,3 @@ async fn send_to() {
active_addr
);
}

#[compio_macros::test]
async fn send_msg_with_ipv6_ecn() {
Berrysoft marked this conversation as resolved.
Show resolved Hide resolved
#[cfg(unix)]
use libc::{IPPROTO_IPV6, IPV6_RECVTCLASS, IPV6_TCLASS};
#[cfg(windows)]
use windows_sys::Win32::Networking::WinSock::{
IPPROTO_IPV6, IPV6_ECN, IPV6_RECVTCLASS, IPV6_TCLASS,
};

const MSG: &str = "foo bar baz";

let passive = UdpSocket::bind("[::1]:0").await.unwrap();
let passive_addr = passive.local_addr().unwrap();

passive
.set_socket_option(IPPROTO_IPV6, IPV6_RECVTCLASS, &1)
.unwrap();

let active = UdpSocket::bind("[::1]:0").await.unwrap();
let active_addr = active.local_addr().unwrap();

let mut control = vec![0u8; 32];
let mut builder = CMsgBuilder::new(&mut control);

const ECN_BITS: i32 = 0b10;

#[cfg(unix)]
builder
.try_push(IPPROTO_IPV6, IPV6_TCLASS, ECN_BITS)
.unwrap();
#[cfg(windows)]
builder.try_push(IPPROTO_IPV6, IPV6_ECN, ECN_BITS).unwrap();

let len = builder.finish();
control.truncate(len);

active.send_msg(MSG, control, passive_addr).await.unwrap();

let ((_, _, addr), (buffer, control)) = passive
.recv_msg(Vec::with_capacity(20), Vec::with_capacity(32))
.await
.unwrap();
assert_eq!(addr, active_addr);
assert_eq!(buffer, MSG.as_bytes());
unsafe {
let mut iter = CMsgIter::new(&control);
let cmsg = iter.next().unwrap();
assert_eq!(cmsg.level(), IPPROTO_IPV6);
assert_eq!(cmsg.ty(), IPV6_TCLASS);
assert_eq!(cmsg.data::<i32>(), &ECN_BITS);
assert!(iter.next().is_none());
}
}
82 changes: 82 additions & 0 deletions compio-quic/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
[package]
name = "compio-quic"
version = "0.1.0"
description = "QUIC for compio"
categories = ["asynchronous", "network-programming"]
keywords = ["async", "net", "quic"]
edition = { workspace = true }
authors = { workspace = true }
readme = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]

[dependencies]
# Workspace dependencies
compio-io = { workspace = true }
compio-buf = { workspace = true }
compio-log = { workspace = true }
compio-net = { workspace = true }
compio-runtime = { workspace = true, features = ["time"] }

quinn-proto = "0.11.3"
rustls = { workspace = true }
rustls-platform-verifier = { version = "0.3.3", optional = true }
rustls-native-certs = { version = "0.7.1", optional = true }
webpki-roots = { version = "0.26.3", optional = true }
h3 = { version = "0.0.6", optional = true }

# Utils
bytes = { workspace = true }
flume = { workspace = true }
futures-util = { workspace = true }
rustc-hash = "2.0.0"
thiserror = "1.0.63"

# Windows specific dependencies
[target.'cfg(windows)'.dependencies]
windows-sys = { workspace = true, features = ["Win32_Networking_WinSock"] }

[target.'cfg(unix)'.dependencies]
libc = { workspace = true }

[dev-dependencies]
compio-buf = { workspace = true, features = ["bytes"] }
compio-dispatcher = { workspace = true }
compio-driver = { workspace = true }
compio-fs = { workspace = true }
compio-macros = { workspace = true }
compio-runtime = { workspace = true, features = ["criterion"] }

criterion = { workspace = true, features = ["async_tokio"] }
http = "1.1.0"
quinn = "0.11.3"
rand = { workspace = true }
rcgen = "0.13.1"
socket2 = { workspace = true, features = ["all"] }
tokio = { workspace = true, features = ["rt", "macros"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }

[features]
default = []
io-compat = ["futures-util/io"]
platform-verifier = ["dep:rustls-platform-verifier"]
native-certs = ["dep:rustls-native-certs"]
webpki-roots = ["dep:webpki-roots"]
h3 = ["dep:h3"]
# FIXME: see https://github.com/quinn-rs/quinn/pull/1962

[[example]]
name = "http3-client"
required-features = ["h3"]

[[example]]
name = "http3-server"
required-features = ["h3"]

[[bench]]
name = "quic"
harness = false
Loading