Skip to content

Commit

Permalink
feat: impl Async Read/Write Rent for File (#297)
Browse files Browse the repository at this point in the history
Signed-off-by: lzzzt <[email protected]>
  • Loading branch information
Lzzzzzt authored Sep 14, 2024
1 parent 43fba29 commit c3cb780
Show file tree
Hide file tree
Showing 19 changed files with 1,414 additions and 746 deletions.
60 changes: 60 additions & 0 deletions monoio/src/buf/io_buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,19 @@ where
}
}

unsafe impl<T> IoBuf for std::mem::ManuallyDrop<T>
where
T: IoBuf,
{
fn read_ptr(&self) -> *const u8 {
<T as IoBuf>::read_ptr(self)
}

fn bytes_init(&self) -> usize {
<T as IoBuf>::bytes_init(self)
}
}

/// A mutable `io_uring` compatible buffer.
///
/// The `IoBufMut` trait is implemented by buffer types that can be passed to
Expand Down Expand Up @@ -359,6 +372,23 @@ unsafe impl IoBufMut for bytes::BytesMut {
}
}

unsafe impl<T> IoBufMut for std::mem::ManuallyDrop<T>
where
T: IoBufMut,
{
fn write_ptr(&mut self) -> *mut u8 {
<T as IoBufMut>::write_ptr(self)
}

fn bytes_total(&mut self) -> usize {
<T as IoBufMut>::bytes_total(self)
}

unsafe fn set_init(&mut self, pos: usize) {
<T as IoBufMut>::set_init(self, pos)
}
}

fn parse_range(range: impl ops::RangeBounds<usize>, end: usize) -> (usize, usize) {
use core::ops::Bound;

Expand All @@ -378,6 +408,8 @@ fn parse_range(range: impl ops::RangeBounds<usize>, end: usize) -> (usize, usize

#[cfg(test)]
mod tests {
use std::mem::ManuallyDrop;

use super::*;

#[test]
Expand Down Expand Up @@ -516,4 +548,32 @@ mod tests {
assert_eq!(slice.bytes_init(), 5);
assert_eq!(slice.into_inner().len(), 6);
}

#[test]
fn io_buf_manually_drop() {
let mut buf = Vec::with_capacity(10);
buf.extend_from_slice(b"0123");
let mut buf = ManuallyDrop::new(buf);
let ptr = buf.as_ptr();

assert_eq!(buf.write_ptr(), ptr as _);
assert_eq!(buf.read_ptr(), ptr);
assert_eq!(buf.bytes_init(), 4);
unsafe { buf.set_init(3) };
assert_eq!(buf.bytes_init(), 3);
assert_eq!(buf.bytes_total(), 10);

let slice = buf.slice(1..3);
assert_eq!((slice.begin(), slice.end()), (1, 3));
assert_eq!(slice.read_ptr(), unsafe { ptr.add(1) });
assert_eq!(slice.bytes_init(), 2);
let buf = ManuallyDrop::into_inner(slice.into_inner());

let mut slice = buf.slice_mut(1..8);
assert_eq!((slice.begin(), slice.end()), (1, 8));
assert_eq!(slice.bytes_total(), 7);
unsafe { slice.set_init(5) };
assert_eq!(slice.bytes_init(), 5);
assert_eq!(slice.into_inner().len(), 6);
}
}
131 changes: 84 additions & 47 deletions monoio/src/driver/op/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd};
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
use {
std::ffi::c_void,
windows_sys::Win32::{
Foundation::TRUE,
Networking::WinSock::{WSAGetLastError, WSARecv, WSAESHUTDOWN},
Storage::FileSystem::{ReadFile, SetFilePointer, FILE_BEGIN, INVALID_SET_FILE_POINTER},
},
windows_sys::Win32::{Foundation::TRUE, Storage::FileSystem::ReadFile},
};

use super::{super::shared_fd::SharedFd, Op, OpAble};
Expand Down Expand Up @@ -42,7 +38,18 @@ impl<T: IoBufMut> Op<Read<T>> {
})
}

pub(crate) async fn read(self) -> BufResult<usize, T> {
pub(crate) fn read(fd: SharedFd, buf: T) -> io::Result<Op<Read<T>>> {
Op::submit_with(Read {
fd,
buf,
// Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html.
// If `offset` is set to `-1`, the offset will use (and advance) the file position, like
// the read(2) and write(2) system calls.
offset: -1i64 as u64,
})
}

pub(crate) async fn result(self) -> BufResult<usize, T> {
let complete = self.await;

// Convert the operation result to `usize`
Expand Down Expand Up @@ -83,54 +90,69 @@ impl<T: IoBufMut> OpAble for Read<T> {
#[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))]
fn legacy_call(&mut self) -> io::Result<u32> {
let fd = self.fd.as_raw_fd();
let seek_offset = libc::off_t::try_from(self.offset)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?;
#[cfg(not(target_os = "macos"))]
return syscall_u32!(pread64(
fd,
self.buf.write_ptr() as _,
self.buf.bytes_total(),
seek_offset as _
));

#[cfg(target_os = "macos")]
return syscall_u32!(pread(
fd,
self.buf.write_ptr() as _,
self.buf.bytes_total(),
seek_offset
));
let mut seek_offset = -1;

if -1i64 as u64 != self.offset {
seek_offset = libc::off_t::try_from(self.offset)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?;
}

if seek_offset == -1 {
syscall_u32!(read(fd, self.buf.write_ptr() as _, self.buf.bytes_total()))
} else {
syscall_u32!(pread(
fd,
self.buf.write_ptr() as _,
self.buf.bytes_total(),
seek_offset as _
))
}
}

#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))]
fn legacy_call(&mut self) -> io::Result<u32> {
use windows_sys::Win32::{
Foundation::{GetLastError, ERROR_HANDLE_EOF},
System::IO::OVERLAPPED,
};

let fd = self.fd.raw_handle() as _;
let seek_offset = libc::off_t::try_from(self.offset)
.map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?;
let seek_offset = self.offset;

let mut bytes_read = 0;
let ret = unsafe {
// see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-setfilepointer
if seek_offset != 0 {
// We use `FILE_BEGIN` because this behavior should be the same with unix syscall
// `pwrite`, which uses the offset from the begin of the file.
let r = SetFilePointer(fd, seek_offset, std::ptr::null_mut(), FILE_BEGIN);
if INVALID_SET_FILE_POINTER == r {
return Err(io::Error::last_os_error());
}
}
// see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-readfile
ReadFile(
fd,
self.buf.write_ptr().cast::<c_void>(),
self.buf.bytes_total() as u32,
&mut bytes_read,
std::ptr::null_mut(),
)
if seek_offset as i64 != -1 {
let mut overlapped: OVERLAPPED = std::mem::zeroed();
overlapped.Anonymous.Anonymous.Offset = seek_offset as u32; // Lower 32 bits of the offset
overlapped.Anonymous.Anonymous.OffsetHigh = (seek_offset >> 32) as u32; // Higher 32 bits of the offset

ReadFile(
fd,
self.buf.write_ptr().cast::<c_void>(),
self.buf.bytes_total() as u32,
&mut bytes_read,
&overlapped as *const _ as *mut _,
)
} else {
ReadFile(
fd,
self.buf.write_ptr().cast::<c_void>(),
self.buf.bytes_total() as u32,
&mut bytes_read,
std::ptr::null_mut(),
)
}
};
if TRUE == ret {
Ok(bytes_read)
} else {
Err(io::Error::last_os_error())

if ret == TRUE {
return Ok(bytes_read);
}

match unsafe { GetLastError() } {
ERROR_HANDLE_EOF => Ok(bytes_read),
error => Err(io::Error::from_raw_os_error(error as _)),
}
}
}
Expand All @@ -140,17 +162,25 @@ pub(crate) struct ReadVec<T> {
/// while the operation is in-flight.
#[allow(unused)]
fd: SharedFd,
offset: u64,

/// Reference to the in-flight buffer.
pub(crate) buf_vec: T,
}

impl<T: IoVecBufMut> Op<ReadVec<T>> {
pub(crate) fn readv(fd: SharedFd, buf_vec: T) -> io::Result<Self> {
Op::submit_with(ReadVec { fd, buf_vec })
Op::submit_with(ReadVec {
fd,
// Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html.
// If `offset` is set to `-1`, the offset will use (and advance) the file position, like
// the readv(2) system calls.
offset: -1i64 as u64,
buf_vec,
})
}

pub(crate) async fn read(self) -> BufResult<usize, T> {
pub(crate) async fn result(self) -> BufResult<usize, T> {
let complete = self.await;
let res = complete.meta.result.map(|v| v as _);
let mut buf_vec = complete.data.buf_vec;
Expand All @@ -168,7 +198,9 @@ impl<T: IoVecBufMut> OpAble for ReadVec<T> {
fn uring_op(&mut self) -> io_uring::squeue::Entry {
let ptr = self.buf_vec.write_iovec_ptr() as _;
let len = self.buf_vec.write_iovec_len() as _;
opcode::Readv::new(types::Fd(self.fd.raw_fd()), ptr, len).build()
opcode::Readv::new(types::Fd(self.fd.raw_fd()), ptr, len)
.offset(self.offset)
.build()
}

#[cfg(any(feature = "legacy", feature = "poll-io"))]
Expand All @@ -188,6 +220,11 @@ impl<T: IoVecBufMut> OpAble for ReadVec<T> {

#[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))]
fn legacy_call(&mut self) -> io::Result<u32> {
// There is no `readv` like syscall of file on windows, but this will be used to send
// socket message.

use windows_sys::Win32::Networking::WinSock::{WSAGetLastError, WSARecv, WSAESHUTDOWN};

let mut nread = 0;
let mut flags = 0;
let ret = unsafe {
Expand Down
2 changes: 1 addition & 1 deletion monoio/src/driver/op/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl<T: IoBufMut> Op<Recv<T>> {
}
}

pub(crate) async fn read(self) -> BufResult<usize, T> {
pub(crate) async fn result(self) -> BufResult<usize, T> {
let complete = self.await;
let res = complete.meta.result.map(|v| v as _);
let mut buf = complete.data.buf;
Expand Down
2 changes: 1 addition & 1 deletion monoio/src/driver/op/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl<T: IoBuf> Op<Send<T>> {
}
}

pub(crate) async fn write(self) -> BufResult<usize, T> {
pub(crate) async fn result(self) -> BufResult<usize, T> {
let complete = self.await;
(complete.meta.result.map(|v| v as _), complete.data.buf)
}
Expand Down
16 changes: 8 additions & 8 deletions monoio/src/driver/op/statx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,33 +28,33 @@ type FdStatx = Statx<SharedFd>;
impl Op<FdStatx> {
/// submit a statx operation
#[cfg(target_os = "linux")]
pub(crate) fn statx_using_fd(fd: &SharedFd, flags: i32) -> std::io::Result<Self> {
pub(crate) fn statx_using_fd(fd: SharedFd, flags: i32) -> std::io::Result<Self> {
Op::submit_with(Statx {
inner: fd.clone(),
inner: fd,
flags,
statx_buf: Box::new(MaybeUninit::uninit()),
})
}

#[cfg(target_os = "linux")]
pub(crate) async fn statx_result(self) -> std::io::Result<statx> {
pub(crate) async fn result(self) -> std::io::Result<statx> {
let complete = self.await;
complete.meta.result?;

Ok(unsafe { MaybeUninit::assume_init(*complete.data.statx_buf) })
}

#[cfg(target_os = "macos")]
pub(crate) fn statx_using_fd(fd: &SharedFd, follow_symlinks: bool) -> std::io::Result<Self> {
pub(crate) fn statx_using_fd(fd: SharedFd, follow_symlinks: bool) -> std::io::Result<Self> {
Op::submit_with(Statx {
inner: fd.clone(),
inner: fd,
follow_symlinks,
stat_buf: Box::new(MaybeUninit::uninit()),
})
}

#[cfg(target_os = "macos")]
pub(crate) async fn statx_result(self) -> std::io::Result<libc::stat> {
pub(crate) async fn result(self) -> std::io::Result<libc::stat> {
let complete = self.await;
complete.meta.result?;

Expand Down Expand Up @@ -130,7 +130,7 @@ impl Op<PathStatx> {
}

#[cfg(target_os = "linux")]
pub(crate) async fn statx_result(self) -> std::io::Result<statx> {
pub(crate) async fn result(self) -> std::io::Result<statx> {
let complete = self.await;
complete.meta.result?;

Expand All @@ -151,7 +151,7 @@ impl Op<PathStatx> {
}

#[cfg(target_os = "macos")]
pub(crate) async fn statx_result(self) -> std::io::Result<libc::stat> {
pub(crate) async fn result(self) -> std::io::Result<libc::stat> {
let complete = self.await;
complete.meta.result?;

Expand Down
Loading

0 comments on commit c3cb780

Please sign in to comment.