diff --git a/Cargo.toml b/Cargo.toml index be1976e5..b86f4f19 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ keywords = ["async", "fs", "io-uring"] tokio = { version = "1.2", features = ["net", "rt"] } slab = "0.4.2" libc = "0.2.80" -io-uring = { version = "0.5.9", features = ["unstable"] } +io-uring = { version = "0.5.12", features = ["unstable"] } socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } diff --git a/src/io/mod.rs b/src/io/mod.rs index bff7d92e..fe30e99f 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -30,6 +30,8 @@ mod send_to; mod send_zc; +mod sendmsg_zc; + mod shared_fd; pub(crate) use shared_fd::SharedFd; diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs new file mode 100644 index 00000000..c5b94a4b --- /dev/null +++ b/src/io/sendmsg_zc.rs @@ -0,0 +1,105 @@ +use crate::buf::IoBuf; +use crate::io::SharedFd; +use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; +use crate::runtime::CONTEXT; +use socket2::SockAddr; +use std::io; +use std::io::IoSlice; +use std::net::SocketAddr; + +pub(crate) struct SendMsgZc { + #[allow(dead_code)] + fd: SharedFd, + #[allow(dead_code)] + io_bufs: Vec, + #[allow(dead_code)] + socket_addr: Box, + msg_control: Option, + msghdr: libc::msghdr, + + /// Hold the number of transmitted bytes + bytes: usize, +} + +impl Op, MultiCQEFuture> { + pub(crate) fn sendmsg_zc( + fd: &SharedFd, + io_bufs: Vec, + socket_addr: SocketAddr, + msg_control: Option, + ) -> io::Result { + use io_uring::{opcode, types}; + + let socket_addr = Box::new(SockAddr::from(socket_addr)); + + let mut msghdr: libc::msghdr = unsafe { std::mem::zeroed() }; + + let mut io_slices: Vec = Vec::with_capacity(io_bufs.len()); + + for io_buf in &io_bufs { + io_slices.push(IoSlice::new(unsafe { + std::slice::from_raw_parts(io_buf.stable_ptr(), io_buf.bytes_init()) + })) + } + + msghdr.msg_iov = io_slices.as_ptr() as *mut _; + msghdr.msg_iovlen = io_slices.len() as _; + msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; + msghdr.msg_namelen = socket_addr.len(); + + match msg_control { + Some(ref _msg_control) => { + msghdr.msg_control = _msg_control.stable_ptr() as *mut _; + msghdr.msg_controllen = _msg_control.bytes_init(); + } + None => { + msghdr.msg_control = std::ptr::null_mut(); + msghdr.msg_controllen = 0_usize; + } + } + + CONTEXT.with(|x| { + x.handle().expect("Not in a runtime context").submit_op( + SendMsgZc { + fd: fd.clone(), + io_bufs, + socket_addr, + msg_control, + msghdr, + bytes: 0, + }, + |sendmsg_zc| { + opcode::SendMsgZc::new( + types::Fd(sendmsg_zc.fd.raw_fd()), + &sendmsg_zc.msghdr as *const _, + ) + .build() + }, + ) + }) + } +} + +impl Completable for SendMsgZc { + type Output = (io::Result, Vec, Option); + + fn complete(self, cqe: CqeResult) -> (io::Result, Vec, Option) { + // Convert the operation result to `usize`, and add previous byte count + let res = cqe.result.map(|v| self.bytes + v as usize); + + // Recover the data buffers. + let io_bufs = self.io_bufs; + + // Recover the ancillary data buffer. + let msg_control = self.msg_control; + + (res, io_bufs, msg_control) + } +} + +impl Updateable for SendMsgZc { + fn update(&mut self, cqe: CqeResult) { + // uring send_zc promises there will be no error on CQE's marked more + self.bytes += *cqe.result.as_ref().unwrap() as usize; + } +} diff --git a/src/io/socket.rs b/src/io/socket.rs index 5e6bc8cf..ce192e87 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -147,6 +147,16 @@ impl Socket { op.await } + pub(crate) async fn sendmsg_zc( + &self, + io_slices: Vec, + socket_addr: SocketAddr, + msg_control: Option, + ) -> (io::Result, Vec, Option) { + let op = Op::sendmsg_zc(&self.fd, io_slices, socket_addr, msg_control).unwrap(); + op.await + } + pub(crate) async fn read(&self, buf: T) -> crate::BufResult { let op = Op::read_at(&self.fd, buf, 0).unwrap(); op.await diff --git a/src/net/udp.rs b/src/net/udp.rs index 47d4dee6..816ea512 100644 --- a/src/net/udp.rs +++ b/src/net/udp.rs @@ -1,6 +1,6 @@ use crate::{ buf::fixed::FixedBuf, - buf::{BoundedBuf, BoundedBufMut}, + buf::{BoundedBuf, BoundedBufMut, IoBuf}, io::{SharedFd, Socket}, }; use socket2::SockAddr; @@ -220,6 +220,18 @@ impl UdpSocket { self.inner.send_zc(buf).await } + /// Sends a message on the socket using a msghdr. + pub async fn sendmsg_zc( + &self, + io_slices: Vec, + socket_addr: SocketAddr, + msg_control: Option, + ) -> (io::Result, Vec, Option) { + self.inner + .sendmsg_zc(io_slices, socket_addr, msg_control) + .await + } + /// Receives a single datagram message on the socket. On success, returns /// the number of bytes read and the origin. pub async fn recv_from(