diff --git a/Cargo.toml b/Cargo.toml index 85ce27dd..183d8ecc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,10 +17,11 @@ keywords = ["async", "fs", "io-uring"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -tokio = { version = "1.2", features = ["net", "rt", "sync"] } -slab = "0.4.2" +tokio = { version = "1.12", features = ["net", "rt", "sync"] } +slab = "0.4.4" libc = "0.2.80" -io-uring = "0.6.0" +rustix = { version = "0.38.42", features = ["net", "io_uring"] } +rustix-uring = "0.2.0" socket2 = { version = "0.4.4", features = ["all"] } bytes = { version = "1.0", optional = true } futures-util = { version = "0.3.26", default-features = false, features = ["std"] } diff --git a/examples/test_create_dir_all.rs b/examples/test_create_dir_all.rs index 3404de8e..690dd2d3 100644 --- a/examples/test_create_dir_all.rs +++ b/examples/test_create_dir_all.rs @@ -1,3 +1,4 @@ +use rustix::fs::StatxFlags; use std::io; use std::path::Path; use tokio_uring::fs; @@ -200,7 +201,7 @@ async fn statx_builder2>(dir_path: P, rel_path: P) -> io::Result< async fn matches_mode>(path: P, want_mode: u16) -> io::Result<()> { let statx = tokio_uring::fs::StatxBuilder::new() - .mask(libc::STATX_MODE) + .mask(StatxFlags::MODE) .pathname(path)? .statx() .await?; diff --git a/src/buf/fixed/buffers.rs b/src/buf/fixed/buffers.rs index bff73d20..1040b77d 100644 --- a/src/buf/fixed/buffers.rs +++ b/src/buf/fixed/buffers.rs @@ -1,4 +1,4 @@ -use libc::iovec; +use rustix::io_uring::iovec; // Abstracts management of fixed buffers in a buffer registry. pub(crate) trait FixedBuffers { diff --git a/src/buf/fixed/handle.rs b/src/buf/fixed/handle.rs index 5e77c125..ed49eed0 100644 --- a/src/buf/fixed/handle.rs +++ b/src/buf/fixed/handle.rs @@ -1,7 +1,7 @@ use super::FixedBuffers; use crate::buf::{IoBuf, IoBufMut}; -use libc::iovec; +use rustix::io_uring::iovec; use std::cell::RefCell; use std::fmt::{self, Debug}; use std::ops::{Deref, DerefMut}; diff --git a/src/buf/fixed/plumbing/pool.rs b/src/buf/fixed/plumbing/pool.rs index 327acbc6..d49d728a 100644 --- a/src/buf/fixed/plumbing/pool.rs +++ b/src/buf/fixed/plumbing/pool.rs @@ -1,7 +1,8 @@ use crate::buf::fixed::{handle::CheckedOutBuf, FixedBuffers}; use crate::buf::IoBufMut; -use libc::{iovec, UIO_MAXIOV}; +use libc::UIO_MAXIOV; +use rustix::io_uring::iovec; use tokio::sync::Notify; use std::cmp; diff --git a/src/buf/fixed/plumbing/registry.rs b/src/buf/fixed/plumbing/registry.rs index 4f88746e..ed6060bd 100644 --- a/src/buf/fixed/plumbing/registry.rs +++ b/src/buf/fixed/plumbing/registry.rs @@ -1,7 +1,8 @@ use crate::buf::fixed::{handle::CheckedOutBuf, FixedBuffers}; use crate::buf::IoBufMut; -use libc::{iovec, UIO_MAXIOV}; +use libc::UIO_MAXIOV; +use rustix::io_uring::iovec; use std::cmp; use std::mem; use std::ptr; diff --git a/src/fs/create_dir_all.rs b/src/fs/create_dir_all.rs index 413ef83d..8d2e0552 100644 --- a/src/fs/create_dir_all.rs +++ b/src/fs/create_dir_all.rs @@ -1,4 +1,5 @@ use futures_util::future::LocalBoxFuture; +use rustix::fs::StatxFlags; use std::io; use std::path::Path; @@ -165,17 +166,17 @@ impl DirBuilder { mod fs_imp { use crate::runtime::driver::op::Op; - use libc::mode_t; + use rustix::fs::Mode; use std::path::Path; #[derive(Debug)] pub struct DirBuilder { - mode: mode_t, + mode: Mode, } impl DirBuilder { pub fn new() -> DirBuilder { - DirBuilder { mode: 0o777 } + DirBuilder { mode: 0o777.into() } } pub async fn mkdir(&self, p: &Path) -> std::io::Result<()> { @@ -183,7 +184,7 @@ mod fs_imp { } pub fn set_mode(&mut self, mode: u32) { - self.mode = mode as mode_t; + self.mode = mode.into(); } } } @@ -193,7 +194,7 @@ mod fs_imp { // Uses one asynchronous uring call to determine this. async fn is_dir>(path: P) -> bool { let mut builder = crate::fs::StatxBuilder::new(); - if builder.mask(libc::STATX_TYPE).pathname(path).is_err() { + if builder.mask(StatxFlags::TYPE).pathname(path).is_err() { return false; } diff --git a/src/fs/directory.rs b/src/fs/directory.rs index 0b7554fc..b81c3c1f 100644 --- a/src/fs/directory.rs +++ b/src/fs/directory.rs @@ -31,7 +31,7 @@ use std::path::Path; /// } /// ``` pub async fn create_dir>(path: P) -> io::Result<()> { - Op::make_dir(path.as_ref(), 0o777)?.await + Op::make_dir(path.as_ref(), 0o777.into())?.await } /// Removes a directory on the local filesystem. diff --git a/src/fs/file.rs b/src/fs/file.rs index 9cd47f21..51d74b5a 100644 --- a/src/fs/file.rs +++ b/src/fs/file.rs @@ -1,3 +1,5 @@ +use rustix_uring::types; + use crate::buf::fixed::FixedBuf; use crate::buf::{BoundedBuf, BoundedBufMut, IoBuf, IoBufMut, Slice}; use crate::fs::OpenOptions; @@ -960,5 +962,5 @@ pub async fn remove_file>(path: P) -> io::Result<()> { /// } /// ``` pub async fn rename(from: impl AsRef, to: impl AsRef) -> io::Result<()> { - Op::rename_at(from.as_ref(), to.as_ref(), 0)?.await + Op::rename_at(from.as_ref(), to.as_ref(), types::RenameFlags::empty())?.await } diff --git a/src/fs/open_options.rs b/src/fs/open_options.rs index 21474593..fc9e7e2d 100644 --- a/src/fs/open_options.rs +++ b/src/fs/open_options.rs @@ -1,3 +1,6 @@ +use rustix::fs::Mode; +use rustix_uring::types; + use crate::fs::File; use crate::runtime::driver::op::Op; @@ -62,8 +65,8 @@ pub struct OpenOptions { truncate: bool, create: bool, create_new: bool, - pub(crate) mode: libc::mode_t, - pub(crate) custom_flags: libc::c_int, + pub(crate) mode: Mode, + pub(crate) custom_flags: types::OFlags, } impl OpenOptions { @@ -95,8 +98,8 @@ impl OpenOptions { truncate: false, create: false, create_new: false, - mode: 0o666, - custom_flags: 0, + mode: 0o666.into(), + custom_flags: types::OFlags::empty(), } } @@ -336,18 +339,18 @@ impl OpenOptions { Op::open(path.as_ref(), self)?.await } - pub(crate) fn access_mode(&self) -> io::Result { + pub(crate) fn access_mode(&self) -> io::Result { match (self.read, self.write, self.append) { - (true, false, false) => Ok(libc::O_RDONLY), - (false, true, false) => Ok(libc::O_WRONLY), - (true, true, false) => Ok(libc::O_RDWR), - (false, _, true) => Ok(libc::O_WRONLY | libc::O_APPEND), - (true, _, true) => Ok(libc::O_RDWR | libc::O_APPEND), + (true, false, false) => Ok(types::OFlags::RDONLY), + (false, true, false) => Ok(types::OFlags::WRONLY), + (true, true, false) => Ok(types::OFlags::RDWR), + (false, _, true) => Ok(types::OFlags::WRONLY | types::OFlags::APPEND), + (true, _, true) => Ok(types::OFlags::RDWR | types::OFlags::APPEND), (false, false, false) => Err(io::Error::from_raw_os_error(libc::EINVAL)), } } - pub(crate) fn creation_mode(&self) -> io::Result { + pub(crate) fn creation_mode(&self) -> io::Result { match (self.write, self.append) { (true, false) => {} (false, false) => { @@ -363,11 +366,11 @@ impl OpenOptions { } Ok(match (self.create, self.truncate, self.create_new) { - (false, false, false) => 0, - (true, false, false) => libc::O_CREAT, - (false, true, false) => libc::O_TRUNC, - (true, true, false) => libc::O_CREAT | libc::O_TRUNC, - (_, _, true) => libc::O_CREAT | libc::O_EXCL, + (false, false, false) => types::OFlags::empty(), + (true, false, false) => types::OFlags::CREATE, + (false, true, false) => types::OFlags::TRUNC, + (true, true, false) => types::OFlags::CREATE | types::OFlags::TRUNC, + (_, _, true) => types::OFlags::CREATE | types::OFlags::EXCL, }) } } @@ -380,12 +383,12 @@ impl Default for OpenOptions { impl OpenOptionsExt for OpenOptions { fn mode(&mut self, mode: u32) -> &mut OpenOptions { - self.mode = mode; + self.mode = mode.into(); self } fn custom_flags(&mut self, flags: i32) -> &mut OpenOptions { - self.custom_flags = flags; + self.custom_flags = types::OFlags::from_bits_truncate(flags as u32); self } } diff --git a/src/fs/statx.rs b/src/fs/statx.rs index 5a3ccb1b..3947fdac 100644 --- a/src/fs/statx.rs +++ b/src/fs/statx.rs @@ -1,3 +1,6 @@ +use rustix::fs::StatxFlags; +use rustix_uring::types; + use super::File; use crate::io::{cstr, SharedFd}; use crate::runtime::driver::op::Op; @@ -28,9 +31,9 @@ impl File { /// f.close().await.unwrap(); /// }) /// ``` - pub async fn statx(&self) -> io::Result { - let flags = libc::AT_EMPTY_PATH; - let mask = libc::STATX_ALL; + pub async fn statx(&self) -> io::Result { + let flags = types::AtFlags::EMPTY_PATH; + let mask = StatxFlags::ALL; Op::statx(Some(self.fd.clone()), None, flags, mask)?.await } @@ -72,8 +75,8 @@ impl File { StatxBuilder { file: Some(self.fd.clone()), path: None, - flags: libc::AT_EMPTY_PATH, - mask: libc::STATX_ALL, + flags: types::AtFlags::EMPTY_PATH, + mask: StatxFlags::ALL, } } } @@ -102,7 +105,7 @@ impl File { /// let statx = tokio_uring::fs::statx("foo.txt").await.unwrap(); /// }) /// ``` -pub async fn statx>(path: P) -> io::Result { +pub async fn statx>(path: P) -> io::Result { StatxBuilder::new().pathname(path).unwrap().statx().await } @@ -115,8 +118,8 @@ pub async fn statx>(path: P) -> io::Result { pub struct StatxBuilder { file: Option, path: Option, - flags: i32, - mask: u32, + flags: types::AtFlags, + mask: StatxFlags, } impl Default for StatxBuilder { @@ -161,8 +164,8 @@ impl StatxBuilder { StatxBuilder { file: None, path: None, - flags: libc::AT_EMPTY_PATH, - mask: libc::STATX_ALL, + flags: types::AtFlags::EMPTY_PATH, + mask: StatxFlags::ALL, } } @@ -241,7 +244,7 @@ impl StatxBuilder { /// }) /// ``` #[must_use] - pub fn flags(&mut self, flags: i32) -> &mut Self { + pub fn flags(&mut self, flags: types::AtFlags) -> &mut Self { self.flags = flags; self } @@ -254,13 +257,13 @@ impl StatxBuilder { /// tokio_uring::start(async { /// // Fetch file metadata /// let statx = tokio_uring::fs::StatxBuilder::new() - /// .mask(libc::STATX_BASIC_STATS) + /// .mask(rustix::fs::StatxFlags::ALL) /// .pathname("foo.txt").unwrap() /// .statx().await.unwrap(); /// }) /// ``` #[must_use] - pub fn mask(&mut self, mask: u32) -> &mut Self { + pub fn mask(&mut self, mask: StatxFlags) -> &mut Self { self.mask = mask; self } @@ -288,7 +291,7 @@ impl StatxBuilder { /// dir.close().await.unwrap(); /// }) /// ``` - pub async fn statx(&mut self) -> io::Result { + pub async fn statx(&mut self) -> io::Result { // TODO should the statx() terminator be renamed to something like submit()? let fd = self.file.take(); let path = self.path.take(); @@ -303,7 +306,7 @@ impl StatxBuilder { #[allow(dead_code)] pub async fn is_dir_regfile>(path: P) -> (bool, bool) { let mut builder = crate::fs::StatxBuilder::new(); - if builder.mask(libc::STATX_TYPE).pathname(path).is_err() { + if builder.mask(StatxFlags::TYPE).pathname(path).is_err() { return (false, false); } diff --git a/src/io/accept.rs b/src/io/accept.rs index d4e19d58..0abadbf9 100644 --- a/src/io/accept.rs +++ b/src/io/accept.rs @@ -1,3 +1,5 @@ +use rustix::net::SocketFlags; + use crate::io::{SharedFd, Socket}; use crate::runtime::driver::op; use crate::runtime::driver::op::{Completable, Op}; @@ -12,7 +14,7 @@ pub(crate) struct Accept { impl Op { pub(crate) fn accept(fd: &SharedFd) -> io::Result> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; let socketaddr = Box::new(( unsafe { std::mem::zeroed() }, @@ -30,7 +32,7 @@ impl Op { &mut accept.socketaddr.0 as *mut _ as *mut _, &mut accept.socketaddr.1, ) - .flags(libc::O_CLOEXEC) + .flags(SocketFlags::CLOEXEC) .build() }, ) diff --git a/src/io/close.rs b/src/io/close.rs index b871d83b..e1b2153d 100644 --- a/src/io/close.rs +++ b/src/io/close.rs @@ -10,7 +10,7 @@ pub(crate) struct Close { impl Op { pub(crate) fn close(fd: RawFd) -> io::Result> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; CONTEXT.with(|x| { x.handle() diff --git a/src/io/connect.rs b/src/io/connect.rs index 242aa981..c5ed8acd 100644 --- a/src/io/connect.rs +++ b/src/io/connect.rs @@ -15,7 +15,7 @@ pub(crate) struct Connect { impl Op { /// Submit a request to connect. pub(crate) fn connect(fd: &SharedFd, socket_addr: SockAddr) -> io::Result> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( @@ -26,7 +26,7 @@ impl Op { |connect| { opcode::Connect::new( types::Fd(connect.fd.raw_fd()), - connect.socket_addr.as_ptr(), + connect.socket_addr.as_ptr() as *const _, connect.socket_addr.len(), ) .build() diff --git a/src/io/fallocate.rs b/src/io/fallocate.rs index 382710d0..b02b4a0c 100644 --- a/src/io/fallocate.rs +++ b/src/io/fallocate.rs @@ -1,6 +1,6 @@ use std::io; -use io_uring::{opcode, types}; +use rustix_uring::{opcode, types}; use crate::{ io::SharedFd, diff --git a/src/io/fsync.rs b/src/io/fsync.rs index b34ff554..c35b01c2 100644 --- a/src/io/fsync.rs +++ b/src/io/fsync.rs @@ -3,7 +3,7 @@ use std::io; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use io_uring::{opcode, types}; +use rustix_uring::{opcode, types}; pub(crate) struct Fsync { fd: SharedFd, diff --git a/src/io/mkdir_at.rs b/src/io/mkdir_at.rs index 4c4f1368..907a09b3 100644 --- a/src/io/mkdir_at.rs +++ b/src/io/mkdir_at.rs @@ -15,8 +15,8 @@ pub(crate) struct Mkdir { impl Op { /// Submit a request to create a directory - pub(crate) fn make_dir(path: &Path, mode: u32) -> io::Result> { - use io_uring::{opcode, types}; + pub(crate) fn make_dir(path: &Path, mode: rustix::fs::Mode) -> io::Result> { + use rustix_uring::{opcode, types}; let _path = cstr(path)?; diff --git a/src/io/noop.rs b/src/io/noop.rs index 8ad28e7e..4c364bfc 100644 --- a/src/io/noop.rs +++ b/src/io/noop.rs @@ -9,7 +9,7 @@ pub struct NoOp {} impl Op { pub fn no_op() -> io::Result> { - use io_uring::opcode; + use rustix_uring::opcode; CONTEXT.with(|x| { x.handle() diff --git a/src/io/open.rs b/src/io/open.rs index 27393131..5e7077c6 100644 --- a/src/io/open.rs +++ b/src/io/open.rs @@ -1,3 +1,5 @@ +use rustix_uring::types::OFlags; + use crate::fs::{File, OpenOptions}; use crate::io::SharedFd; @@ -11,18 +13,18 @@ use std::path::Path; #[allow(dead_code)] pub(crate) struct Open { pub(crate) path: CString, - pub(crate) flags: libc::c_int, + pub(crate) flags: OFlags, } impl Op { /// Submit a request to open a file. pub(crate) fn open(path: &Path, options: &OpenOptions) -> io::Result> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; let path = super::util::cstr(path)?; - let flags = libc::O_CLOEXEC + let flags = OFlags::CLOEXEC | options.access_mode()? | options.creation_mode()? - | (options.custom_flags & !libc::O_ACCMODE); + | (options.custom_flags & !OFlags::ACCMODE); CONTEXT.with(|x| { x.handle() diff --git a/src/io/read.rs b/src/io/read.rs index c3395b40..0252926c 100644 --- a/src/io/read.rs +++ b/src/io/read.rs @@ -18,7 +18,7 @@ pub(crate) struct Read { impl Op> { pub(crate) fn read_at(fd: &SharedFd, buf: T, offset: u64) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( diff --git a/src/io/read_fixed.rs b/src/io/read_fixed.rs index 3cb96cdb..6ede6bd0 100644 --- a/src/io/read_fixed.rs +++ b/src/io/read_fixed.rs @@ -26,7 +26,7 @@ where buf: T, offset: u64, ) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( diff --git a/src/io/readv.rs b/src/io/readv.rs index ff71dc79..9da90a88 100644 --- a/src/io/readv.rs +++ b/src/io/readv.rs @@ -4,7 +4,7 @@ use crate::BufResult; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; -use libc::iovec; +use rustix_uring::types::iovec; use std::io; pub(crate) struct Readv { @@ -25,7 +25,7 @@ impl Op> { mut bufs: Vec, offset: u64, ) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; // Build `iovec` objects referring the provided `bufs` for `io_uring::opcode::Readv`. let iovs: Vec = bufs diff --git a/src/io/recv_from.rs b/src/io/recv_from.rs index e9b360ca..d81126f9 100644 --- a/src/io/recv_from.rs +++ b/src/io/recv_from.rs @@ -1,6 +1,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; +use rustix::io_uring::msghdr; use socket2::SockAddr; use std::{ io::IoSliceMut, @@ -13,12 +14,12 @@ pub(crate) struct RecvFrom { pub(crate) buf: T, io_slices: Vec>, pub(crate) socket_addr: Box, - pub(crate) msghdr: Box, + pub(crate) msghdr: Box, } impl Op> { pub(crate) fn recv_from(fd: &SharedFd, mut buf: T) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; let mut io_slices = vec![IoSliceMut::new(unsafe { std::slice::from_raw_parts_mut(buf.stable_mut_ptr(), buf.bytes_total()) @@ -26,11 +27,11 @@ impl Op> { let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 }); - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); + let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); msghdr.msg_iov = io_slices.as_mut_ptr().cast(); msghdr.msg_iovlen = io_slices.len() as _; msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len(); + msghdr.msg_namelen = socket_addr.len() as _; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( diff --git a/src/io/recvmsg.rs b/src/io/recvmsg.rs index 3cae2e50..7ed679ba 100644 --- a/src/io/recvmsg.rs +++ b/src/io/recvmsg.rs @@ -1,6 +1,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use crate::{buf::BoundedBufMut, io::SharedFd, BufResult}; +use rustix::io_uring::msghdr; use socket2::SockAddr; use std::{ io::IoSliceMut, @@ -14,12 +15,12 @@ pub(crate) struct RecvMsg { #[allow(dead_code)] io_slices: Vec>, pub(crate) socket_addr: Box, - pub(crate) msghdr: Box, + pub(crate) msghdr: Box, } impl Op> { pub(crate) fn recvmsg(fd: &SharedFd, mut bufs: Vec) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; let mut io_slices = Vec::with_capacity(bufs.len()); for buf in &mut bufs { @@ -30,11 +31,11 @@ impl Op> { let socket_addr = Box::new(unsafe { SockAddr::init(|_, _| Ok(()))?.1 }); - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); + let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); msghdr.msg_iov = io_slices.as_mut_ptr().cast(); msghdr.msg_iovlen = io_slices.len() as _; msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len(); + msghdr.msg_namelen = socket_addr.len() as _; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( diff --git a/src/io/rename_at.rs b/src/io/rename_at.rs index 3186824e..1e0e4fbd 100644 --- a/src/io/rename_at.rs +++ b/src/io/rename_at.rs @@ -1,3 +1,5 @@ +use rustix_uring::types::RenameFlags; + use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use std::ffi::CString; @@ -16,8 +18,12 @@ pub(crate) struct RenameAt { impl Op { /// Submit a request to rename a specified path to a new name with /// the provided flags. - pub(crate) fn rename_at(from: &Path, to: &Path, flags: u32) -> io::Result> { - use io_uring::{opcode, types}; + pub(crate) fn rename_at( + from: &Path, + to: &Path, + flags: RenameFlags, + ) -> io::Result> { + use rustix_uring::{opcode, types}; let from = super::util::cstr(from)?; let to = super::util::cstr(to)?; diff --git a/src/io/send_to.rs b/src/io/send_to.rs index 8895f5fa..bb9f01ad 100644 --- a/src/io/send_to.rs +++ b/src/io/send_to.rs @@ -3,6 +3,7 @@ use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use crate::BufResult; +use rustix::io_uring::msghdr; use socket2::SockAddr; use std::io::IoSlice; use std::{boxed::Box, io, net::SocketAddr}; @@ -15,7 +16,7 @@ pub(crate) struct SendTo { io_slices: Vec>, #[allow(dead_code)] socket_addr: Option>, - pub(crate) msghdr: Box, + pub(crate) msghdr: Box, } impl Op> { @@ -24,13 +25,13 @@ impl Op> { buf: T, socket_addr: Option, ) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; let io_slices = vec![IoSlice::new(unsafe { std::slice::from_raw_parts(buf.stable_ptr(), buf.bytes_init()) })]; - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); + let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); msghdr.msg_iov = io_slices.as_ptr() as *mut _; msghdr.msg_iovlen = io_slices.len() as _; @@ -38,7 +39,7 @@ impl Op> { Some(_socket_addr) => { let socket_addr = Box::new(SockAddr::from(_socket_addr)); msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len(); + msghdr.msg_namelen = socket_addr.len() as _; Some(socket_addr) } None => { diff --git a/src/io/send_zc.rs b/src/io/send_zc.rs index df37722b..81176b32 100644 --- a/src/io/send_zc.rs +++ b/src/io/send_zc.rs @@ -17,7 +17,7 @@ pub(crate) struct SendZc { impl Op, MultiCQEFuture> { pub(crate) fn send_zc(fd: &SharedFd, buf: T) -> io::Result { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( diff --git a/src/io/sendmsg.rs b/src/io/sendmsg.rs index 5b02288c..5b5bbd0e 100644 --- a/src/io/sendmsg.rs +++ b/src/io/sendmsg.rs @@ -2,6 +2,7 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; +use rustix::io_uring::msghdr; use socket2::SockAddr; use std::io; use std::io::IoSlice; @@ -13,7 +14,7 @@ pub(crate) struct SendMsg { _io_slices: Vec>, _socket_addr: Option>, msg_control: Option, - msghdr: Box, + msghdr: Box, } impl Op> { @@ -23,9 +24,9 @@ impl Op> { socket_addr: Option, msg_control: Option, ) -> io::Result { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); + let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); let mut io_slices: Vec> = Vec::with_capacity(io_bufs.len()); @@ -42,7 +43,7 @@ impl Op> { Some(_socket_addr) => { let socket_addr = Box::new(SockAddr::from(_socket_addr)); msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len(); + msghdr.msg_namelen = socket_addr.len() as _; Some(socket_addr) } None => { diff --git a/src/io/sendmsg_zc.rs b/src/io/sendmsg_zc.rs index e7da9fe5..de5fe606 100644 --- a/src/io/sendmsg_zc.rs +++ b/src/io/sendmsg_zc.rs @@ -2,6 +2,7 @@ use crate::buf::BoundedBuf; use crate::io::SharedFd; use crate::runtime::driver::op::{Completable, CqeResult, MultiCQEFuture, Op, Updateable}; use crate::runtime::CONTEXT; +use rustix::io_uring::msghdr; use socket2::SockAddr; use std::io; use std::io::IoSlice; @@ -17,7 +18,7 @@ pub(crate) struct SendMsgZc { #[allow(dead_code)] socket_addr: Option>, msg_control: Option, - msghdr: Box, + msghdr: Box, /// Hold the number of transmitted bytes bytes: usize, @@ -30,9 +31,9 @@ impl Op, MultiCQEFuture> { socket_addr: Option, msg_control: Option, ) -> io::Result { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; - let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); + let mut msghdr: Box = Box::new(unsafe { std::mem::zeroed() }); let mut io_slices: Vec> = Vec::with_capacity(io_bufs.len()); @@ -49,7 +50,7 @@ impl Op, MultiCQEFuture> { Some(_socket_addr) => { let socket_addr = Box::new(SockAddr::from(_socket_addr)); msghdr.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - msghdr.msg_namelen = socket_addr.len(); + msghdr.msg_namelen = socket_addr.len() as _; Some(socket_addr) } None => { diff --git a/src/io/socket.rs b/src/io/socket.rs index dda1bb36..989c21ce 100644 --- a/src/io/socket.rs +++ b/src/io/socket.rs @@ -6,6 +6,7 @@ use crate::{ io::SharedFd, UnsubmittedOneshot, }; +use rustix::fd::BorrowedFd; use std::{ io, net::SocketAddr, @@ -254,7 +255,7 @@ impl Socket { } pub(crate) fn listen(&self, backlog: libc::c_int) -> io::Result<()> { - syscall!(listen(self.as_raw_fd(), backlog))?; + rustix::net::listen(unsafe { BorrowedFd::borrow_raw(self.as_raw_fd()) }, backlog)?; Ok(()) } diff --git a/src/io/statx.rs b/src/io/statx.rs index e7796b0e..c9ca2b39 100644 --- a/src/io/statx.rs +++ b/src/io/statx.rs @@ -1,7 +1,8 @@ use std::ffi::CString; use std::{ffi::CStr, io}; -use io_uring::{opcode, types}; +use rustix::fs::StatxFlags; +use rustix_uring::{opcode, types}; use crate::runtime::{ driver::op::{Completable, CqeResult, Op}, @@ -18,7 +19,7 @@ pub(crate) struct Statx { // TODO consider returning this type when the operation is complete so the caller has the boxed value. // The builder could even recycle an old boxed value and pass it in here. - statx: Box, + statx: Box, } impl Op { @@ -28,8 +29,8 @@ impl Op { pub(crate) fn statx( fd: Option, path: Option, - flags: i32, - mask: u32, + flags: types::AtFlags, + mask: StatxFlags, ) -> io::Result> { let raw = fd.as_ref().map_or(libc::AT_FDCWD, |fd| fd.raw_fd()); let mut flags = flags; @@ -37,7 +38,7 @@ impl Op { Some(path) => path, None => { // If there is no path, add appropriate bit to flags. - flags |= libc::AT_EMPTY_PATH; + flags |= types::AtFlags::EMPTY_PATH; CStr::from_bytes_with_nul(b"\0").unwrap().into() // TODO Is there a constant CString we // could use here. } @@ -53,7 +54,7 @@ impl Op { opcode::Statx::new( types::Fd(raw), statx.path.as_ptr(), - &mut *statx.statx as *mut libc::statx as *mut types::statx, + &mut *statx.statx as *mut types::Statx, ) .flags(flags) .mask(mask) @@ -65,7 +66,7 @@ impl Op { } impl Completable for Statx { - type Output = io::Result; + type Output = io::Result; fn complete(self, cqe: CqeResult) -> Self::Output { cqe.result?; diff --git a/src/io/symlink.rs b/src/io/symlink.rs index 8509dddc..dccfe8f8 100644 --- a/src/io/symlink.rs +++ b/src/io/symlink.rs @@ -17,7 +17,7 @@ impl Op { from: P, to: Q, ) -> io::Result> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; let _from = cstr(from.as_ref())?; let _to = cstr(to.as_ref())?; diff --git a/src/io/unlink_at.rs b/src/io/unlink_at.rs index 412ca022..3102a449 100644 --- a/src/io/unlink_at.rs +++ b/src/io/unlink_at.rs @@ -1,3 +1,5 @@ +use rustix_uring::types::AtFlags; + use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use std::ffi::CString; @@ -12,17 +14,17 @@ pub(crate) struct Unlink { impl Op { /// Submit a request to unlink a directory with provided flags. pub(crate) fn unlink_dir(path: &Path) -> io::Result> { - Self::unlink(path, libc::AT_REMOVEDIR) + Self::unlink(path, AtFlags::REMOVEDIR) } /// Submit a request to unlink a file with provided flags. pub(crate) fn unlink_file(path: &Path) -> io::Result> { - Self::unlink(path, 0) + Self::unlink(path, AtFlags::empty()) } /// Submit a request to unlink a specified path with provided flags. - pub(crate) fn unlink(path: &Path, flags: i32) -> io::Result> { - use io_uring::{opcode, types}; + pub(crate) fn unlink(path: &Path, flags: AtFlags) -> io::Result> { + use rustix_uring::{opcode, types}; let path = super::util::cstr(path)?; diff --git a/src/io/write.rs b/src/io/write.rs index 6c607f75..5e01ebd3 100644 --- a/src/io/write.rs +++ b/src/io/write.rs @@ -1,5 +1,5 @@ use crate::{buf::BoundedBuf, io::SharedFd, BufResult, OneshotOutputTransform, UnsubmittedOneshot}; -use io_uring::cqueue::Entry; +use rustix_uring::cqueue::Entry; use std::io; use std::marker::PhantomData; @@ -37,7 +37,7 @@ impl OneshotOutputTransform for WriteTransform { impl UnsubmittedWrite { pub(crate) fn write_at(fd: &SharedFd, buf: T, offset: u64) -> Self { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; // Get raw buffer info let ptr = buf.stable_ptr(); diff --git a/src/io/write_fixed.rs b/src/io/write_fixed.rs index 1d2c3e38..8138ae36 100644 --- a/src/io/write_fixed.rs +++ b/src/io/write_fixed.rs @@ -25,7 +25,7 @@ where buf: T, offset: u64, ) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( diff --git a/src/io/writev.rs b/src/io/writev.rs index 86236ebc..c86aa669 100644 --- a/src/io/writev.rs +++ b/src/io/writev.rs @@ -1,7 +1,8 @@ +use rustix::io_uring::iovec; + use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use crate::{buf::BoundedBuf, io::SharedFd, BufResult}; -use libc::iovec; use std::io; pub(crate) struct Writev { @@ -22,7 +23,7 @@ impl Op> { mut bufs: Vec, offset: u64, ) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; // Build `iovec` objects referring the provided `bufs` for `io_uring::opcode::Readv`. let iovs: Vec = bufs diff --git a/src/io/writev_all.rs b/src/io/writev_all.rs index ef5b9d40..82b08956 100644 --- a/src/io/writev_all.rs +++ b/src/io/writev_all.rs @@ -1,7 +1,7 @@ use crate::runtime::driver::op::{Completable, CqeResult, Op}; use crate::runtime::CONTEXT; use crate::{buf::BoundedBuf, io::SharedFd}; -use libc::iovec; +use rustix::io_uring::iovec; use std::io; // This provides a common write-all implementation for writev and is fairly efficient by allocating @@ -126,7 +126,7 @@ impl Op> { iovs_len: u32, offset: u64, ) -> io::Result>> { - use io_uring::{opcode, types}; + use rustix_uring::{opcode, types}; CONTEXT.with(|x| { x.handle().expect("Not in a runtime context").submit_op( diff --git a/src/lib.rs b/src/lib.rs index 819eebf7..187293be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,17 +59,6 @@ #![warn(missing_docs)] #![allow(clippy::thread_local_initializer_can_be_made_const)] -macro_rules! syscall { - ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ - let res = unsafe { libc::$fn($($arg, )*) }; - if res == -1 { - Err(std::io::Error::last_os_error()) - } else { - Ok(res) - } - }}; -} - #[macro_use] mod future; mod io; @@ -155,8 +144,8 @@ pub fn start(future: F) -> F::Output { /// This function is provided to avoid requiring the user of this crate from /// having to use the io_uring crate as well. Refer to Builder::start example /// for its intended usage. -pub fn uring_builder() -> io_uring::Builder { - io_uring::IoUring::builder() +pub fn uring_builder() -> rustix_uring::Builder { + rustix_uring::IoUring::builder() } /// Builder API that can create and start the `io_uring` runtime with non-default parameters, @@ -164,7 +153,7 @@ pub fn uring_builder() -> io_uring::Builder { // #[derive(Clone, Default)] pub struct Builder { entries: u32, - urb: io_uring::Builder, + urb: rustix_uring::Builder, } /// Constructs a [`Builder`] with default settings. @@ -176,7 +165,7 @@ pub struct Builder { pub fn builder() -> Builder { Builder { entries: 256, - urb: io_uring::IoUring::builder(), + urb: rustix_uring::IoUring::builder(), } } @@ -196,7 +185,7 @@ impl Builder { /// inner `io_uring` API. /// /// Refer to the [`io_uring::Builder`] documentation for all the supported methods. - pub fn uring_builder(&mut self, b: &io_uring::Builder) -> &mut Self { + pub fn uring_builder(&mut self, b: &rustix_uring::Builder) -> &mut Self { self.urb = b.clone(); self } diff --git a/src/runtime/driver/handle.rs b/src/runtime/driver/handle.rs index 115f780d..4a8b0e12 100644 --- a/src/runtime/driver/handle.rs +++ b/src/runtime/driver/handle.rs @@ -12,7 +12,7 @@ //! The weak handle should be used by anything which is stored in the driver or does not need to //! keep the driver alive for it's duration. -use io_uring::{cqueue, squeue}; +use rustix_uring::{cqueue, squeue}; use std::cell::RefCell; use std::io; use std::ops::Deref; @@ -45,7 +45,7 @@ impl Handle { self.inner.borrow_mut().dispatch_completions() } - pub(crate) fn flush(&self) -> io::Result { + pub(crate) fn flush(&self) -> rustix_uring::Result { self.inner.borrow_mut().uring.submit() } diff --git a/src/runtime/driver/mod.rs b/src/runtime/driver/mod.rs index f57605d6..90c36c3b 100644 --- a/src/runtime/driver/mod.rs +++ b/src/runtime/driver/mod.rs @@ -1,7 +1,7 @@ use crate::buf::fixed::FixedBuffers; use crate::runtime::driver::op::{Completable, Lifecycle, MultiCQEFuture, Op, Updateable}; -use io_uring::opcode::AsyncCancel; -use io_uring::{cqueue, squeue, IoUring}; +use rustix_uring::opcode::AsyncCancel; +use rustix_uring::{cqueue, squeue, IoUring}; use slab::Slab; use std::cell::RefCell; use std::os::unix::io::{AsRawFd, RawFd}; @@ -47,7 +47,7 @@ impl Driver { }) } - fn wait(&self) -> io::Result { + fn wait(&self) -> rustix_uring::Result { self.uring.submit_and_wait(1) } @@ -57,17 +57,17 @@ impl Driver { self.ops.lifecycle.len() } - pub(crate) fn submit(&mut self) -> io::Result<()> { + pub(crate) fn submit(&mut self) -> rustix_uring::Result<()> { loop { match self.uring.submit() { Ok(_) => { self.uring.submission().sync(); return Ok(()); } - Err(ref e) if e.raw_os_error() == Some(libc::EBUSY) => { + Err(ref e) if e.raw_os_error() == libc::EBUSY => { self.dispatch_completions(); } - Err(e) if e.raw_os_error() != Some(libc::EINTR) => { + Err(e) if e.raw_os_error() != libc::EINTR => { return Err(e); } _ => continue, @@ -393,7 +393,7 @@ impl Drop for Driver { Lifecycle::CompletionList(indices) => { let mut list = indices.clone().into_list(&mut self.ops.completions); - if !io_uring::cqueue::more(list.peek_end().unwrap().flags) { + if !rustix_uring::cqueue::more(list.peek_end().unwrap().flags) { // This op is complete. Replace with a null Completed entry // safety: zeroed memory is entirely valid with this underlying // representation @@ -516,7 +516,7 @@ mod test { #[derive(Debug)] pub(crate) struct Completion { result: io::Result, - flags: u32, + flags: cqueue::Flags, data: Rc<()>, } @@ -562,7 +562,7 @@ mod test { } = assert_ready!(op.poll()); assert_eq!(2, Rc::strong_count(&data)); assert_eq!(0, result.unwrap()); - assert_eq!(0, flags); + assert_eq!(0, flags.bits()); drop(d); assert_eq!(1, Rc::strong_count(&data)); @@ -586,7 +586,7 @@ mod test { assert!(op.is_woken()); let Completion { result, flags, .. } = assert_ready!(op.poll()); assert_eq!(0, result.unwrap()); - assert_eq!(0, flags); + assert_eq!(0, flags.bits()); } release(); @@ -608,7 +608,7 @@ mod test { assert!(op.is_woken()); let Completion { result, flags, .. } = assert_ready!(op.poll()); assert_eq!(0, result.unwrap()); - assert_eq!(0, flags); + assert_eq!(0, flags.bits()); } release(); @@ -624,7 +624,7 @@ mod test { let Completion { result, flags, .. } = assert_ready!(op.poll()); assert_eq!(0, result.unwrap()); - assert_eq!(0, flags); + assert_eq!(0, flags.bits()); drop(op); assert_eq!(0, num_operations()); diff --git a/src/runtime/driver/op/mod.rs b/src/runtime/driver/op/mod.rs index 5d90daaf..12b68bfb 100644 --- a/src/runtime/driver/op/mod.rs +++ b/src/runtime/driver/op/mod.rs @@ -4,7 +4,7 @@ use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll, Waker}; -use io_uring::{cqueue, squeue}; +use rustix_uring::{cqueue, squeue}; mod slab_list; @@ -179,7 +179,7 @@ pub(crate) enum Lifecycle { /// A single CQE entry pub(crate) struct CqeResult { pub(crate) result: io::Result, - pub(crate) flags: u32, + pub(crate) flags: cqueue::Flags, } impl From for CqeResult { @@ -270,7 +270,7 @@ impl Lifecycle { match mem::replace(self, Lifecycle::Submitted) { x @ Lifecycle::Submitted | x @ Lifecycle::Waiting(..) => { - if io_uring::cqueue::more(cqe.flags()) { + if rustix_uring::cqueue::more(cqe.flags()) { let mut list = SlabListIndices::new().into_list(completions); list.push(cqe.into()); *self = Lifecycle::CompletionList(list.into_indices()); @@ -286,7 +286,7 @@ impl Lifecycle { } lifecycle @ Lifecycle::Ignored(..) => { - if io_uring::cqueue::more(cqe.flags()) { + if rustix_uring::cqueue::more(cqe.flags()) { // Not yet complete. The Op has been dropped, so we can drop the CQE // but we must keep the lifecycle alive until no more CQE's expected *self = lifecycle;