diff --git a/monoio/src/buf/io_buf.rs b/monoio/src/buf/io_buf.rs index 8ba91475..90c06be4 100644 --- a/monoio/src/buf/io_buf.rs +++ b/monoio/src/buf/io_buf.rs @@ -204,6 +204,19 @@ where } } +unsafe impl IoBuf for std::mem::ManuallyDrop +where + T: IoBuf, +{ + fn read_ptr(&self) -> *const u8 { + ::read_ptr(self) + } + + fn bytes_init(&self) -> usize { + ::bytes_init(self) + } +} + /// A mutable `io_uring` compatible buffer. /// /// The `IoBufMut` trait is implemented by buffer types that can be passed to @@ -359,6 +372,23 @@ unsafe impl IoBufMut for bytes::BytesMut { } } +unsafe impl IoBufMut for std::mem::ManuallyDrop +where + T: IoBufMut, +{ + fn write_ptr(&mut self) -> *mut u8 { + ::write_ptr(self) + } + + fn bytes_total(&mut self) -> usize { + ::bytes_total(self) + } + + unsafe fn set_init(&mut self, pos: usize) { + ::set_init(self, pos) + } +} + fn parse_range(range: impl ops::RangeBounds, end: usize) -> (usize, usize) { use core::ops::Bound; @@ -378,6 +408,8 @@ fn parse_range(range: impl ops::RangeBounds, end: usize) -> (usize, usize #[cfg(test)] mod tests { + use std::mem::ManuallyDrop; + use super::*; #[test] @@ -516,4 +548,32 @@ mod tests { assert_eq!(slice.bytes_init(), 5); assert_eq!(slice.into_inner().len(), 6); } + + #[test] + fn io_buf_manually_drop() { + let mut buf = Vec::with_capacity(10); + buf.extend_from_slice(b"0123"); + let mut buf = ManuallyDrop::new(buf); + let ptr = buf.as_ptr(); + + assert_eq!(buf.write_ptr(), ptr as _); + assert_eq!(buf.read_ptr(), ptr); + assert_eq!(buf.bytes_init(), 4); + unsafe { buf.set_init(3) }; + assert_eq!(buf.bytes_init(), 3); + assert_eq!(buf.bytes_total(), 10); + + let slice = buf.slice(1..3); + assert_eq!((slice.begin(), slice.end()), (1, 3)); + assert_eq!(slice.read_ptr(), unsafe { ptr.add(1) }); + assert_eq!(slice.bytes_init(), 2); + let buf = ManuallyDrop::into_inner(slice.into_inner()); + + let mut slice = buf.slice_mut(1..8); + assert_eq!((slice.begin(), slice.end()), (1, 8)); + assert_eq!(slice.bytes_total(), 7); + unsafe { slice.set_init(5) }; + assert_eq!(slice.bytes_init(), 5); + assert_eq!(slice.into_inner().len(), 6); + } } diff --git a/monoio/src/driver/op/read.rs b/monoio/src/driver/op/read.rs index 2e4805c9..27e26169 100644 --- a/monoio/src/driver/op/read.rs +++ b/monoio/src/driver/op/read.rs @@ -7,11 +7,7 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { std::ffi::c_void, - windows_sys::Win32::{ - Foundation::TRUE, - Networking::WinSock::{WSAGetLastError, WSARecv, WSAESHUTDOWN}, - Storage::FileSystem::{ReadFile, SetFilePointer, FILE_BEGIN, INVALID_SET_FILE_POINTER}, - }, + windows_sys::Win32::{Foundation::TRUE, Storage::FileSystem::ReadFile}, }; use super::{super::shared_fd::SharedFd, Op, OpAble}; @@ -42,7 +38,18 @@ impl Op> { }) } - pub(crate) async fn read(self) -> BufResult { + pub(crate) fn read(fd: SharedFd, buf: T) -> io::Result>> { + Op::submit_with(Read { + fd, + buf, + // Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. + // If `offset` is set to `-1`, the offset will use (and advance) the file position, like + // the read(2) and write(2) system calls. + offset: -1i64 as u64, + }) + } + + pub(crate) async fn result(self) -> BufResult { let complete = self.await; // Convert the operation result to `usize` @@ -83,54 +90,69 @@ impl OpAble for Read { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - let seek_offset = libc::off_t::try_from(self.offset) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - #[cfg(not(target_os = "macos"))] - return syscall_u32!(pread64( - fd, - self.buf.write_ptr() as _, - self.buf.bytes_total(), - seek_offset as _ - )); - #[cfg(target_os = "macos")] - return syscall_u32!(pread( - fd, - self.buf.write_ptr() as _, - self.buf.bytes_total(), - seek_offset - )); + let mut seek_offset = -1; + + if -1i64 as u64 != self.offset { + seek_offset = libc::off_t::try_from(self.offset) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + } + + if seek_offset == -1 { + syscall_u32!(read(fd, self.buf.write_ptr() as _, self.buf.bytes_total())) + } else { + syscall_u32!(pread( + fd, + self.buf.write_ptr() as _, + self.buf.bytes_total(), + seek_offset as _ + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { + use windows_sys::Win32::{ + Foundation::{GetLastError, ERROR_HANDLE_EOF}, + System::IO::OVERLAPPED, + }; + let fd = self.fd.raw_handle() as _; - let seek_offset = libc::off_t::try_from(self.offset) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + let seek_offset = self.offset; + let mut bytes_read = 0; let ret = unsafe { - // see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-setfilepointer - if seek_offset != 0 { - // We use `FILE_BEGIN` because this behavior should be the same with unix syscall - // `pwrite`, which uses the offset from the begin of the file. - let r = SetFilePointer(fd, seek_offset, std::ptr::null_mut(), FILE_BEGIN); - if INVALID_SET_FILE_POINTER == r { - return Err(io::Error::last_os_error()); - } - } // see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-readfile - ReadFile( - fd, - self.buf.write_ptr().cast::(), - self.buf.bytes_total() as u32, - &mut bytes_read, - std::ptr::null_mut(), - ) + if seek_offset as i64 != -1 { + let mut overlapped: OVERLAPPED = std::mem::zeroed(); + overlapped.Anonymous.Anonymous.Offset = seek_offset as u32; // Lower 32 bits of the offset + overlapped.Anonymous.Anonymous.OffsetHigh = (seek_offset >> 32) as u32; // Higher 32 bits of the offset + + ReadFile( + fd, + self.buf.write_ptr().cast::(), + self.buf.bytes_total() as u32, + &mut bytes_read, + &overlapped as *const _ as *mut _, + ) + } else { + ReadFile( + fd, + self.buf.write_ptr().cast::(), + self.buf.bytes_total() as u32, + &mut bytes_read, + std::ptr::null_mut(), + ) + } }; - if TRUE == ret { - Ok(bytes_read) - } else { - Err(io::Error::last_os_error()) + + if ret == TRUE { + return Ok(bytes_read); + } + + match unsafe { GetLastError() } { + ERROR_HANDLE_EOF => Ok(bytes_read), + error => Err(io::Error::from_raw_os_error(error as _)), } } } @@ -140,6 +162,7 @@ pub(crate) struct ReadVec { /// while the operation is in-flight. #[allow(unused)] fd: SharedFd, + offset: u64, /// Reference to the in-flight buffer. pub(crate) buf_vec: T, @@ -147,10 +170,17 @@ pub(crate) struct ReadVec { impl Op> { pub(crate) fn readv(fd: SharedFd, buf_vec: T) -> io::Result { - Op::submit_with(ReadVec { fd, buf_vec }) + Op::submit_with(ReadVec { + fd, + // Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. + // If `offset` is set to `-1`, the offset will use (and advance) the file position, like + // the readv(2) system calls. + offset: -1i64 as u64, + buf_vec, + }) } - pub(crate) async fn read(self) -> BufResult { + pub(crate) async fn result(self) -> BufResult { let complete = self.await; let res = complete.meta.result.map(|v| v as _); let mut buf_vec = complete.data.buf_vec; @@ -168,7 +198,9 @@ impl OpAble for ReadVec { fn uring_op(&mut self) -> io_uring::squeue::Entry { let ptr = self.buf_vec.write_iovec_ptr() as _; let len = self.buf_vec.write_iovec_len() as _; - opcode::Readv::new(types::Fd(self.fd.raw_fd()), ptr, len).build() + opcode::Readv::new(types::Fd(self.fd.raw_fd()), ptr, len) + .offset(self.offset) + .build() } #[cfg(any(feature = "legacy", feature = "poll-io"))] @@ -188,6 +220,11 @@ impl OpAble for ReadVec { #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { + // There is no `readv` like syscall of file on windows, but this will be used to send + // socket message. + + use windows_sys::Win32::Networking::WinSock::{WSAGetLastError, WSARecv, WSAESHUTDOWN}; + let mut nread = 0; let mut flags = 0; let ret = unsafe { diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index a995695d..ae9b2571 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -64,7 +64,7 @@ impl Op> { } } - pub(crate) async fn read(self) -> BufResult { + pub(crate) async fn result(self) -> BufResult { let complete = self.await; let res = complete.meta.result.map(|v| v as _); let mut buf = complete.data.buf; diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index 3c5f99bc..d49e13b9 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -44,7 +44,7 @@ impl Op> { } } - pub(crate) async fn write(self) -> BufResult { + pub(crate) async fn result(self) -> BufResult { let complete = self.await; (complete.meta.result.map(|v| v as _), complete.data.buf) } diff --git a/monoio/src/driver/op/statx.rs b/monoio/src/driver/op/statx.rs index 97a365fc..971ee3c0 100644 --- a/monoio/src/driver/op/statx.rs +++ b/monoio/src/driver/op/statx.rs @@ -28,16 +28,16 @@ type FdStatx = Statx; impl Op { /// submit a statx operation #[cfg(target_os = "linux")] - pub(crate) fn statx_using_fd(fd: &SharedFd, flags: i32) -> std::io::Result { + pub(crate) fn statx_using_fd(fd: SharedFd, flags: i32) -> std::io::Result { Op::submit_with(Statx { - inner: fd.clone(), + inner: fd, flags, statx_buf: Box::new(MaybeUninit::uninit()), }) } #[cfg(target_os = "linux")] - pub(crate) async fn statx_result(self) -> std::io::Result { + pub(crate) async fn result(self) -> std::io::Result { let complete = self.await; complete.meta.result?; @@ -45,16 +45,16 @@ impl Op { } #[cfg(target_os = "macos")] - pub(crate) fn statx_using_fd(fd: &SharedFd, follow_symlinks: bool) -> std::io::Result { + pub(crate) fn statx_using_fd(fd: SharedFd, follow_symlinks: bool) -> std::io::Result { Op::submit_with(Statx { - inner: fd.clone(), + inner: fd, follow_symlinks, stat_buf: Box::new(MaybeUninit::uninit()), }) } #[cfg(target_os = "macos")] - pub(crate) async fn statx_result(self) -> std::io::Result { + pub(crate) async fn result(self) -> std::io::Result { let complete = self.await; complete.meta.result?; @@ -130,7 +130,7 @@ impl Op { } #[cfg(target_os = "linux")] - pub(crate) async fn statx_result(self) -> std::io::Result { + pub(crate) async fn result(self) -> std::io::Result { let complete = self.await; complete.meta.result?; @@ -151,7 +151,7 @@ impl Op { } #[cfg(target_os = "macos")] - pub(crate) async fn statx_result(self) -> std::io::Result { + pub(crate) async fn result(self) -> std::io::Result { let complete = self.await; complete.meta.result?; diff --git a/monoio/src/driver/op/write.rs b/monoio/src/driver/op/write.rs index 03665f77..699f0027 100644 --- a/monoio/src/driver/op/write.rs +++ b/monoio/src/driver/op/write.rs @@ -5,18 +5,14 @@ use std::os::unix::prelude::AsRawFd; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] -use windows_sys::Win32::{ - Foundation::TRUE, - Networking::WinSock::WSASend, - Storage::FileSystem::{SetFilePointer, WriteFile, FILE_BEGIN, INVALID_SET_FILE_POINTER}, -}; +use windows_sys::Win32::{Foundation::TRUE, Storage::FileSystem::WriteFile}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; use crate::{ buf::{IoBuf, IoVecBuf}, - syscall_u32, BufResult, + BufResult, }; pub(crate) struct Write { @@ -24,8 +20,11 @@ pub(crate) struct Write { /// while the operation is in-flight. #[allow(unused)] fd: SharedFd, + /// Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. + /// + /// If `offset` is set to `-1`, the offset will use (and advance) the file position, like + /// the write(2) system calls. offset: u64, - pub(crate) buf: T, } @@ -38,7 +37,15 @@ impl Op> { }) } - pub(crate) async fn write(self) -> BufResult { + pub(crate) fn write(fd: SharedFd, buf: T) -> io::Result>> { + Op::submit_with(Write { + fd, + offset: -1i64 as u64, + buf, + }) + } + + pub(crate) async fn result(self) -> BufResult { let complete = self.await; (complete.meta.result.map(|v| v as _), complete.data.buf) } @@ -66,55 +73,72 @@ impl OpAble for Write { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { + use crate::syscall_u32; + let fd = self.fd.as_raw_fd(); - let seek_offset = libc::off_t::try_from(self.offset) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; - #[cfg(not(target_os = "macos"))] - return syscall_u32!(pwrite64( - fd, - self.buf.read_ptr() as _, - self.buf.bytes_init(), - seek_offset as _ - )); - #[cfg(target_os = "macos")] - return syscall_u32!(pwrite( - fd, - self.buf.read_ptr() as _, - self.buf.bytes_init(), - seek_offset - )); + let mut seek_offset = -1; + + if -1i64 as u64 != self.offset { + seek_offset = libc::off_t::try_from(self.offset) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + } + + if seek_offset == -1 { + syscall_u32!(write(fd, self.buf.read_ptr() as _, self.buf.bytes_init())) + } else { + syscall_u32!(pwrite( + fd, + self.buf.read_ptr() as _, + self.buf.bytes_init(), + seek_offset as _ + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { + use windows_sys::Win32::{ + Foundation::{GetLastError, ERROR_HANDLE_EOF}, + System::IO::OVERLAPPED, + }; + let fd = self.fd.raw_handle() as _; - let seek_offset = libc::off_t::try_from(self.offset) - .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + let seek_offset = self.offset; let mut bytes_write = 0; + let ret = unsafe { - // see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-setfilepointer - if seek_offset != 0 { - // We use `FILE_BEGIN` because this behavior should be the same with unix syscall - // `pwrite`, which uses the offset from the begin of the file. - let r = SetFilePointer(fd, seek_offset, std::ptr::null_mut(), FILE_BEGIN); - if INVALID_SET_FILE_POINTER == r { - return Err(io::Error::last_os_error()); - } + // see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-readfile + if seek_offset as i64 != -1 { + let mut overlapped: OVERLAPPED = std::mem::zeroed(); + overlapped.Anonymous.Anonymous.Offset = seek_offset as u32; // Lower 32 bits of the offset + overlapped.Anonymous.Anonymous.OffsetHigh = (seek_offset >> 32) as u32; // Higher 32 bits of the offset + + WriteFile( + fd, + self.buf.read_ptr(), + self.buf.bytes_init() as u32, + &mut bytes_write, + &overlapped as *const _ as *mut _, + ) + } else { + WriteFile( + fd, + self.buf.read_ptr(), + self.buf.bytes_init() as u32, + &mut bytes_write, + std::ptr::null_mut(), + ) } - // see https://learn.microsoft.com/zh-cn/windows/win32/api/fileapi/nf-fileapi-writefile - WriteFile( - fd, - self.buf.read_ptr(), - self.buf.bytes_init() as u32, - &mut bytes_write, - std::ptr::null_mut(), - ) }; - if TRUE == ret { - Ok(bytes_write) - } else { - Err(io::Error::last_os_error()) + + if ret == TRUE { + return Ok(bytes_write); + } + + match unsafe { GetLastError() } { + ERROR_HANDLE_EOF => Ok(bytes_write), + error => Err(io::Error::from_raw_os_error(error as _)), } } } @@ -122,29 +146,33 @@ impl OpAble for Write { pub(crate) struct WriteVec { /// Holds a strong ref to the FD, preventing the file from being closed /// while the operation is in-flight. - #[allow(unused)] fd: SharedFd, - + /// Refers to https://docs.rs/io-uring/latest/io_uring/opcode/struct.Write.html. + /// + /// If `offset` is set to `-1`, the offset will use (and advance) the file position, like + /// the writev(2) system calls. + offset: u64, pub(crate) buf_vec: T, } impl Op> { - pub(crate) fn writev(fd: &SharedFd, buf_vec: T) -> io::Result { + pub(crate) fn writev(fd: SharedFd, buf_vec: T) -> io::Result { Op::submit_with(WriteVec { - fd: fd.clone(), + fd, + offset: -1i64 as u64, buf_vec, }) } - #[allow(unused)] pub(crate) fn writev_raw(fd: &SharedFd, buf_vec: T) -> WriteVec { WriteVec { fd: fd.clone(), + offset: -1i64 as u64, buf_vec, } } - pub(crate) async fn write(self) -> BufResult { + pub(crate) async fn result(self) -> BufResult { let complete = self.await; (complete.meta.result.map(|v| v as _), complete.data.buf_vec) } @@ -155,7 +183,9 @@ impl OpAble for WriteVec { fn uring_op(&mut self) -> io_uring::squeue::Entry { let ptr = self.buf_vec.read_iovec_ptr() as *const _; let len = self.buf_vec.read_iovec_len() as _; - opcode::Writev::new(types::Fd(self.fd.raw_fd()), ptr, len).build() + opcode::Writev::new(types::Fd(self.fd.raw_fd()), ptr, len) + .offset(self.offset) + .build() } #[cfg(any(feature = "legacy", feature = "poll-io"))] @@ -168,15 +198,41 @@ impl OpAble for WriteVec { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { - syscall_u32!(writev( - self.fd.raw_fd(), - self.buf_vec.read_iovec_ptr(), - self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _ - )) + use crate::syscall_u32; + + let fd = self.fd.raw_fd(); + let mut seek_offset = -1; + + if -1i64 as u64 != self.offset { + seek_offset = libc::off_t::try_from(self.offset) + .map_err(|_| io::Error::new(io::ErrorKind::Other, "offset too big"))?; + } + + if seek_offset == -1 { + syscall_u32!(writev( + fd, + self.buf_vec.read_iovec_ptr(), + self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _ + )) + } else { + syscall_u32!(pwritev( + fd, + self.buf_vec.read_iovec_ptr(), + self.buf_vec.read_iovec_len().min(i32::MAX as usize) as _, + seek_offset + )) + } } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { + // There is no `writev` like syscall of file on windows, but this will be used to send + // socket message. + + use windows_sys::Win32::Networking::WinSock::WSASend; + + use crate::syscall_u32; + let mut bytes_sent = 0; syscall_u32!(WSASend( self.fd.raw_socket() as _, diff --git a/monoio/src/fs/file.rs b/monoio/src/fs/file/mod.rs similarity index 62% rename from monoio/src/fs/file.rs rename to monoio/src/fs/file/mod.rs index 70b1ab3f..d21b78eb 100644 --- a/monoio/src/fs/file.rs +++ b/monoio/src/fs/file/mod.rs @@ -1,543 +1,708 @@ -#[cfg(windows)] -use std::os::windows::io::{AsRawHandle, RawHandle}; -#[cfg(unix)] -use std::{ - fs::File as StdFile, - os::{ - fd::IntoRawFd, - unix::io::{AsRawFd, RawFd}, - }, -}; -use std::{io, path::Path}; - -#[cfg(unix)] -use super::{metadata::FileAttr, Metadata}; -use crate::{ - buf::{IoBuf, IoBufMut}, - driver::{op::Op, shared_fd::SharedFd}, - fs::OpenOptions, -}; - -/// A reference to an open file on the filesystem. -/// -/// An instance of a `File` can be read and/or written depending on what options -/// it was opened with. The `File` type provides **positional** read and write -/// operations. The file does not maintain an internal cursor. The caller is -/// required to specify an offset when issuing an operation. -/// -/// While files are automatically closed when they go out of scope, the -/// operation happens asynchronously in the background. It is recommended to -/// call the `close()` function in order to guarantee that the file successfully -/// closed before exiting the scope. Closing a file does not guarantee writes -/// have persisted to disk. Use [`sync_all`] to ensure all writes have reached -/// the filesystem. -/// -/// [`sync_all`]: File::sync_all -/// -/// # Examples -/// -/// Creates a new file and write data to it: -/// -/// ```no_run -/// use monoio::fs::File; -/// -/// #[monoio::main] -/// async fn main() -> Result<(), Box> { -/// // Open a file -/// let file = File::create("hello.txt").await?; -/// -/// // Write some data -/// let (res, buf) = file.write_at(&b"hello world"[..], 0).await; -/// let n = res?; -/// -/// println!("wrote {} bytes", n); -/// -/// // Sync data to the file system. -/// file.sync_all().await?; -/// -/// // Close the file -/// file.close().await?; -/// -/// Ok(()) -/// } -/// ``` -#[derive(Debug)] -pub struct File { - /// Open file descriptor - fd: SharedFd, -} - -impl File { - /// Attempts to open a file in read-only mode. - /// - /// See the [`OpenOptions::open`] method for more details. - /// - /// # Errors - /// - /// This function will return an error if `path` does not already exist. - /// Other errors may also be returned according to [`OpenOptions::open`]. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let f = File::open("foo.txt").await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// } - /// ``` - pub async fn open(path: impl AsRef) -> io::Result { - OpenOptions::new().read(true).open(path).await - } - - /// Opens a file in write-only mode. - /// - /// This function will create a file if it does not exist, - /// and will truncate it if it does. - /// - /// See the [`OpenOptions::open`] function for more details. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let f = File::create("foo.txt").await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// } - /// ``` - pub async fn create(path: impl AsRef) -> io::Result { - OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path) - .await - } - - pub(crate) fn from_shared_fd(fd: SharedFd) -> File { - File { fd } - } - - /// Converts a [`std::fs::File`] to a [`monoio::fs::File`](File). - /// - /// # Examples - /// - /// ```no_run - /// // This line could block. It is not recommended to do this on the monoio - /// // runtime. - /// let std_file = std::fs::File::open("foo.txt").unwrap(); - /// let file = monoio::fs::File::from_std(std_file); - /// ``` - #[cfg(unix)] - pub fn from_std(std: StdFile) -> io::Result { - Ok(File { - fd: SharedFd::new_without_register(std.into_raw_fd()), - }) - } - - /// Read some bytes at the specified offset from the file into the specified - /// buffer, returning how many bytes were read. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// as an argument. - /// - /// If the method returns [`Ok(n)`], then the read was successful. A nonzero - /// `n` value indicates that the buffer has been filled with `n` bytes of - /// data from the file. If `n` is `0`, then one of the following happened: - /// - /// 1. The specified offset is the end of the file. - /// 2. The buffer specified was 0 bytes in length. - /// - /// It is not an error if the returned value `n` is smaller than the buffer - /// size, even when the file contains enough data to fill the buffer. - /// - /// # Errors - /// - /// If this function encounters any form of I/O or other error, an error - /// variant will be returned. The buffer is returned on error. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let f = File::open("foo.txt").await?; - /// let buffer = vec![0; 10]; - /// - /// // Read up to 10 bytes - /// let (res, buffer) = f.read_at(buffer, 0).await; - /// let n = res?; - /// - /// println!("The bytes: {:?}", &buffer[..n]); - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// } - /// ``` - pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { - // Submit the read operation - let op = Op::read_at(&self.fd, buf, pos).unwrap(); - op.read().await - } - - /// Read the exact number of bytes required to fill `buf` at the specified - /// offset from the file. - /// - /// This function reads as many as bytes as necessary to completely fill the - /// specified buffer `buf`. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// as an argument. - /// - /// If the method returns [`Ok(())`], then the read was successful. - /// - /// # Errors - /// - /// If this function encounters an error of the kind - /// [`ErrorKind::Interrupted`] then the error is ignored and the - /// operation will continue. - /// - /// If this function encounters an "end of file" before completely filling - /// the buffer, it returns an error of the kind - /// [`ErrorKind::UnexpectedEof`]. The buffer is returned on error. - /// - /// If this function encounters any form of I/O or other error, an error - /// variant will be returned. The buffer is returned on error. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let f = File::open("foo.txt").await?; - /// let buffer = vec![0; 10]; - /// - /// // Read up to 10 bytes - /// let (res, buffer) = f.read_exact_at(buffer, 0).await; - /// res?; - /// - /// println!("The bytes: {:?}", buffer); - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// } - /// ``` - /// - /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted - /// [`ErrorKind::UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof - pub async fn read_exact_at( - &self, - mut buf: T, - pos: u64, - ) -> crate::BufResult<(), T> { - let len = buf.bytes_total(); - let mut read = 0; - while read < len { - let slice = unsafe { buf.slice_mut_unchecked(read..len) }; - let (res, slice) = self.read_at(slice, pos + read as u64).await; - buf = slice.into_inner(); - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "failed to fill whole buffer", - )), - buf, - ) - } - Ok(n) => { - read += n; - } - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), - }; - } - - (Ok(()), buf) - } - - /// Write a buffer into this file at the specified offset, returning how - /// many bytes were written. - /// - /// This function will attempt to write the entire contents of `buf`, but - /// the entire write may not succeed, or the write may also generate an - /// error. The bytes will be written starting at the specified offset. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// in as an argument. A return value of `0` typically means that the - /// underlying file is no longer able to accept bytes and will likely not be - /// able to in the future as well, or that the buffer provided is empty. - /// - /// # Errors - /// - /// Each call to `write` may generate an I/O error indicating that the - /// operation could not be completed. If an error is returned then no bytes - /// in the buffer were written to this writer. - /// - /// It is **not** considered an error if the entire buffer could not be - /// written to this writer. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let file = File::create("foo.txt").await?; - /// - /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (res, _) = file.write_at(&b"some bytes"[..], 0).await; - /// let n = res?; - /// - /// println!("wrote {} bytes", n); - /// - /// // Close the file - /// file.close().await?; - /// Ok(()) - /// } - /// ``` - /// - /// [`Ok(n)`]: Ok - pub async fn write_at(&self, buf: T, pos: u64) -> crate::BufResult { - let op = Op::write_at(&self.fd, buf, pos).unwrap(); - op.write().await - } - - /// Attempts to write an entire buffer into this file at the specified - /// offset. - /// - /// This method will continuously call [`write_at`] until there is no more - /// data to be written or an error of non-[`ErrorKind::Interrupted`] - /// kind is returned. This method will not return until the entire - /// buffer has been successfully written or such an error occurs. - /// - /// If the buffer contains no data, this will never call [`write_at`]. - /// - /// # Return - /// - /// The method returns the operation result and the same buffer value passed - /// in as an argument. - /// - /// # Errors - /// - /// This function will return the first error of - /// non-[`ErrorKind::Interrupted`] kind that [`write_at`] returns. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let file = File::create("foo.txt").await?; - /// - /// // Writes some prefix of the byte string, not necessarily all of it. - /// let (res, _) = file.write_all_at(&b"some bytes"[..], 0).await; - /// res?; - /// - /// println!("wrote all bytes"); - /// - /// // Close the file - /// file.close().await?; - /// Ok(()) - /// } - /// ``` - /// - /// [`write_at`]: File::write_at - /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted - pub async fn write_all_at(&self, mut buf: T, pos: u64) -> crate::BufResult<(), T> { - let len = buf.bytes_init(); - let mut written = 0; - while written < len { - let slice = unsafe { buf.slice_unchecked(written..len) }; - let (res, slice) = self.write_at(slice, pos + written as u64).await; - buf = slice.into_inner(); - match res { - Ok(0) => { - return ( - Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write whole buffer", - )), - buf, - ) - } - Ok(n) => written += n, - Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} - Err(e) => return (Err(e), buf), - }; - } - - (Ok(()), buf) - } - - /// Attempts to sync all OS-internal metadata to disk. - /// - /// This function will attempt to ensure that all in-memory data reaches the - /// filesystem before completing. - /// - /// This can be used to handle errors that would otherwise only be caught - /// when the `File` is closed. Dropping a file will ignore errors in - /// synchronizing this in-memory data. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).await; - /// let n = res?; - /// - /// f.sync_all().await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// } - /// ``` - pub async fn sync_all(&self) -> io::Result<()> { - let op = Op::fsync(&self.fd).unwrap(); - let completion = op.await; - - completion.meta.result?; - Ok(()) - } - - /// Attempts to sync file data to disk. - /// - /// This method is similar to [`sync_all`], except that it may not - /// synchronize file metadata to the filesystem. - /// - /// This is intended for use cases that must synchronize content, but don't - /// need the metadata on disk. The goal of this method is to reduce disk - /// operations. - /// - /// Note that some platforms may simply implement this in terms of - /// [`sync_all`]. - /// - /// [`sync_all`]: File::sync_all - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// let f = File::create("foo.txt").await?; - /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).await; - /// let n = res?; - /// - /// f.sync_data().await?; - /// - /// // Close the file - /// f.close().await?; - /// Ok(()) - /// } - /// ``` - pub async fn sync_data(&self) -> io::Result<()> { - let op = Op::datasync(&self.fd).unwrap(); - let completion = op.await; - - completion.meta.result?; - Ok(()) - } - - /// Closes the file. - /// - /// The method completes once the close operation has completed, - /// guaranteeing that resources associated with the file have been released. - /// - /// If `close` is not called before dropping the file, the file is closed in - /// the background, but there is no guarantee as to **when** the close - /// operation will complete. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> Result<(), Box> { - /// // Open the file - /// let f = File::open("foo.txt").await?; - /// // Close the file - /// f.close().await?; - /// - /// Ok(()) - /// } - /// ``` - pub async fn close(self) -> io::Result<()> { - self.fd.close().await; - Ok(()) - } - - /// Queries metadata about the underlying file. - /// - /// # Examples - /// - /// ```no_run - /// use monoio::fs::File; - /// - /// #[monoio::main] - /// async fn main() -> std::io::Result<()> { - /// let mut f = File::open("foo.txt").await?; - /// let metadata = f.metadata().await?; - /// Ok(()) - /// } - /// ``` - #[cfg(unix)] - pub async fn metadata(&self) -> io::Result { - #[cfg(target_os = "linux")] - let flags = libc::AT_STATX_SYNC_AS_STAT | libc::AT_EMPTY_PATH; - #[cfg(target_os = "linux")] - let op = Op::statx_using_fd(&self.fd, flags)?; - #[cfg(target_os = "macos")] - let op = Op::statx_using_fd(&self.fd, true)?; - - op.statx_result().await.map(FileAttr::from).map(Metadata) - } -} - -#[cfg(unix)] -impl AsRawFd for File { - fn as_raw_fd(&self) -> RawFd { - self.fd.raw_fd() - } -} - -#[cfg(windows)] -impl AsRawHandle for File { - fn as_raw_handle(&self) -> RawHandle { - self.fd.raw_handle() - } -} +use std::{io, path::Path}; + +use crate::{ + buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, + driver::{op::Op, shared_fd::SharedFd}, + fs::OpenOptions, + io::{AsyncReadRent, AsyncWriteRent}, +}; + +#[cfg(unix)] +mod unix; +#[cfg(unix)] +use unix as file_impl; +#[cfg(windows)] +mod windows; +#[cfg(windows)] +use windows as file_impl; + +/// A reference to an open file on the filesystem. +/// +/// An instance of a `File` can be read and/or written depending on what options +/// it was opened with. The `File` type provides **positional** read and write +/// operations. The file does not maintain an internal cursor. The caller is +/// required to specify an offset when issuing an operation. +/// +/// While files are automatically closed when they go out of scope, the +/// operation happens asynchronously in the background. It is recommended to +/// call the `close()` function in order to guarantee that the file successfully +/// closed before exiting the scope. Closing a file does not guarantee writes +/// have persisted to disk. Use [`sync_all`] to ensure all writes have reached +/// the filesystem. +/// +/// [`sync_all`]: File::sync_all +/// +/// # Examples +/// +/// Creates a new file and write data to it: +/// +/// ```no_run +/// use monoio::fs::File; +/// +/// #[monoio::main] +/// async fn main() -> Result<(), Box> { +/// // Open a file +/// let file = File::create("hello.txt").await?; +/// +/// // Write some data +/// let (res, buf) = file.write_at(&b"hello world"[..], 0).await; +/// let n = res?; +/// +/// println!("wrote {} bytes", n); +/// +/// // Sync data to the file system. +/// file.sync_all().await?; +/// +/// // Close the file +/// file.close().await?; +/// +/// Ok(()) +/// } +/// ``` +#[derive(Debug)] +pub struct File { + /// Open file descriptor + fd: SharedFd, +} + +impl File { + /// Attempts to open a file in read-only mode. + /// + /// See the [`OpenOptions::open`] method for more details. + /// + /// # Errors + /// + /// This function will return an error if `path` does not already exist. + /// Other errors may also be returned according to [`OpenOptions::open`]. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let f = File::open("foo.txt").await?; + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// } + /// ``` + pub async fn open(path: impl AsRef) -> io::Result { + OpenOptions::new().read(true).open(path).await + } + + /// Opens a file in write-only mode. + /// + /// This function will create a file if it does not exist, + /// and will truncate it if it does. + /// + /// See the [`OpenOptions::open`] function for more details. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let f = File::create("foo.txt").await?; + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// } + /// ``` + pub async fn create(path: impl AsRef) -> io::Result { + OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .await + } + + pub(crate) fn from_shared_fd(fd: SharedFd) -> File { + File { fd } + } + + async fn read(&mut self, buf: T) -> crate::BufResult { + let op = Op::read(self.fd.clone(), buf).unwrap(); + op.result().await + } + + async fn read_vectored(&mut self, buf_vec: T) -> crate::BufResult { + file_impl::read_vectored(self.fd.clone(), buf_vec).await + } + + /// Read some bytes at the specified offset from the file into the specified + /// buffer, returning how many bytes were read. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// as an argument. + /// + /// If the method returns [`Ok(n)`], then the read was successful. A nonzero + /// `n` value indicates that the buffer has been filled with `n` bytes of + /// data from the file. If `n` is `0`, then one of the following happened: + /// + /// 1. The specified offset is the end of the file. + /// 2. The buffer specified was 0 bytes in length. + /// + /// It is not an error if the returned value `n` is smaller than the buffer + /// size, even when the file contains enough data to fill the buffer. + /// + /// # Platform-specific behavior + /// + /// - On unix-like platform + /// - this function will **not** change the file pointer, and the `pos` always start from + /// the begin of file. + /// - On windows + /// - this function will change the file pointer, but the `pos` always start from the begin + /// of file. + /// + /// # Errors + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. The buffer is returned on error. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let f = File::open("foo.txt").await?; + /// let buffer = vec![0; 10]; + /// + /// // Read up to 10 bytes + /// let (res, buffer) = f.read_at(buffer, 0).await; + /// let n = res?; + /// + /// println!("The bytes: {:?}", &buffer[..n]); + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// } + /// ``` + pub async fn read_at(&self, buf: T, pos: u64) -> crate::BufResult { + // Submit the read operation + let op = Op::read_at(&self.fd, buf, pos).unwrap(); + op.result().await + } + + /// Read the exact number of bytes required to fill `buf` at the specified + /// offset from the file. + /// + /// This function reads as many as bytes as necessary to completely fill the + /// specified buffer `buf`. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// as an argument. + /// + /// If the method returns [`Ok(())`], then the read was successful. + /// + /// # Errors + /// + /// If this function encounters an error of the kind + /// [`ErrorKind::Interrupted`] then the error is ignored and the + /// operation will continue. + /// + /// If this function encounters an "end of file" before completely filling + /// the buffer, it returns an error of the kind + /// [`ErrorKind::UnexpectedEof`]. The buffer is returned on error. + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. The buffer is returned on error. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let f = File::open("foo.txt").await?; + /// let buffer = vec![0; 10]; + /// + /// // Read up to 10 bytes + /// let (res, buffer) = f.read_exact_at(buffer, 0).await; + /// res?; + /// + /// println!("The bytes: {:?}", buffer); + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// } + /// ``` + /// + /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted + /// [`ErrorKind::UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof + pub async fn read_exact_at( + &self, + mut buf: T, + pos: u64, + ) -> crate::BufResult<(), T> { + let len = buf.bytes_total(); + let mut read = 0; + while read < len { + let slice = unsafe { buf.slice_mut_unchecked(read..len) }; + let (res, slice) = self.read_at(slice, pos + read as u64).await; + buf = slice.into_inner(); + match res { + Ok(0) => { + return ( + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )), + buf, + ) + } + Ok(n) => { + read += n; + } + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), + }; + } + + (Ok(()), buf) + } + + async fn write(&mut self, buf: T) -> crate::BufResult { + let op = Op::write(self.fd.clone(), buf).unwrap(); + op.result().await + } + + async fn write_vectored(&mut self, buf_vec: T) -> crate::BufResult { + file_impl::write_vectored(self.fd.clone(), buf_vec).await + } + + /// Write a buffer into this file at the specified offset, returning how + /// many bytes were written. + /// + /// This function will attempt to write the entire contents of `buf`, but + /// the entire write may not succeed, or the write may also generate an + /// error. The bytes will be written starting at the specified offset. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// in as an argument. A return value of `0` typically means that the + /// underlying file is no longer able to accept bytes and will likely not be + /// able to in the future as well, or that the buffer provided is empty. + /// + /// # Errors + /// + /// Each call to `write` may generate an I/O error indicating that the + /// operation could not be completed. If an error is returned then no bytes + /// in the buffer were written to this writer. + /// + /// It is **not** considered an error if the entire buffer could not be + /// written to this writer. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let file = File::create("foo.txt").await?; + /// + /// // Writes some prefix of the byte string, not necessarily all of it. + /// let (res, _) = file.write_at(&b"some bytes"[..], 0).await; + /// let n = res?; + /// + /// println!("wrote {} bytes", n); + /// + /// // Close the file + /// file.close().await?; + /// Ok(()) + /// } + /// ``` + /// + /// [`Ok(n)`]: Ok + pub async fn write_at(&self, buf: T, pos: u64) -> crate::BufResult { + let op = Op::write_at(&self.fd, buf, pos).unwrap(); + op.result().await + } + + /// Attempts to write an entire buffer into this file at the specified + /// offset. + /// + /// This method will continuously call [`write_at`] until there is no more + /// data to be written or an error of non-[`ErrorKind::Interrupted`] + /// kind is returned. This method will not return until the entire + /// buffer has been successfully written or such an error occurs. + /// + /// If the buffer contains no data, this will never call [`write_at`]. + /// + /// # Return + /// + /// The method returns the operation result and the same buffer value passed + /// in as an argument. + /// + /// # Errors + /// + /// This function will return the first error of + /// non-[`ErrorKind::Interrupted`] kind that [`write_at`] returns. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let file = File::create("foo.txt").await?; + /// + /// // Writes some prefix of the byte string, not necessarily all of it. + /// let (res, _) = file.write_all_at(&b"some bytes"[..], 0).await; + /// res?; + /// + /// println!("wrote all bytes"); + /// + /// // Close the file + /// file.close().await?; + /// Ok(()) + /// } + /// ``` + /// + /// [`write_at`]: File::write_at + /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted + pub async fn write_all_at(&self, mut buf: T, pos: u64) -> crate::BufResult<(), T> { + let len = buf.bytes_init(); + let mut written = 0; + while written < len { + let slice = unsafe { buf.slice_unchecked(written..len) }; + let (res, slice) = self.write_at(slice, pos + written as u64).await; + buf = slice.into_inner(); + match res { + Ok(0) => { + return ( + Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write whole buffer", + )), + buf, + ) + } + Ok(n) => written += n, + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {} + Err(e) => return (Err(e), buf), + }; + } + + (Ok(()), buf) + } + + /// Attempts to sync all OS-internal metadata to disk. + /// + /// This function will attempt to ensure that all in-memory data reaches the + /// filesystem before completing. + /// + /// This can be used to handle errors that would otherwise only be caught + /// when the `File` is closed. Dropping a file will ignore errors in + /// synchronizing this in-memory data. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let f = File::create("foo.txt").await?; + /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).await; + /// let n = res?; + /// + /// f.sync_all().await?; + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// } + /// ``` + pub async fn sync_all(&self) -> io::Result<()> { + let op = Op::fsync(&self.fd).unwrap(); + let completion = op.await; + + completion.meta.result?; + Ok(()) + } + + /// Attempts to sync file data to disk. + /// + /// This method is similar to [`sync_all`], except that it may not + /// synchronize file metadata to the filesystem. + /// + /// This is intended for use cases that must synchronize content, but don't + /// need the metadata on disk. The goal of this method is to reduce disk + /// operations. + /// + /// Note that some platforms may simply implement this in terms of + /// [`sync_all`]. + /// + /// [`sync_all`]: File::sync_all + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// let f = File::create("foo.txt").await?; + /// let (res, buf) = f.write_at(&b"Hello, world!"[..], 0).await; + /// let n = res?; + /// + /// f.sync_data().await?; + /// + /// // Close the file + /// f.close().await?; + /// Ok(()) + /// } + /// ``` + pub async fn sync_data(&self) -> io::Result<()> { + let op = Op::datasync(&self.fd).unwrap(); + let completion = op.await; + + completion.meta.result?; + Ok(()) + } + + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + + /// Closes the file. + /// + /// The method completes once the close operation has completed, + /// guaranteeing that resources associated with the file have been released. + /// + /// If `close` is not called before dropping the file, the file is closed in + /// the background, but there is no guarantee as to **when** the close + /// operation will complete. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> Result<(), Box> { + /// // Open the file + /// let f = File::open("foo.txt").await?; + /// // Close the file + /// f.close().await?; + /// + /// Ok(()) + /// } + /// ``` + pub async fn close(self) -> io::Result<()> { + self.fd.close().await; + Ok(()) + } +} + +impl AsyncWriteRent for File { + /// Writes the contents of a buffer to a file, returning the number of bytes written. + /// + /// This function attempts to write the entire buffer `buf`, but the write may not fully + /// succeed, and it might also result in an error. A call to `write` represents *at most one* + /// attempt to write to the underlying object. + /// + /// # Return + /// + /// If the return value is `(Ok(n), buf)`, it guarantees that `n <= buf.len()`. A return value + /// of `0` typically indicates that the underlying object can no longer accept bytes and likely + /// won't be able to in the future, or that the provided buffer is empty. + /// + /// # Errors + /// + /// Each `write` call may result in an I/O error, indicating that the operation couldn't be + /// completed. If an error occurs, no bytes from the buffer were written to the file. + /// + /// It is **not** considered an error if the entire buffer could not be written to the file. + /// + /// # Example + /// + /// ```no_run + /// use monoio::io::AsyncWriteRent; + /// + /// #[monoio::main] + /// async fn main() -> std::io::Result<()> { + /// let mut file = monoio::fs::File::create("example.txt").await?; + /// let (res, buf) = file.write("Hello, world").await; + /// res?; + /// Ok(()) + /// } + /// ``` + async fn write(&mut self, buf: T) -> crate::BufResult { + self.write(buf).await + } + + /// This function attempts to write the entire contents of `buf_vec`, but the write may not + /// fully succeed, and it might also result in an error. The bytes will be written starting at + /// the current file pointer. + /// + /// # Return + /// + /// The method returns the result of the operation along with the same array of buffers passed + /// as an argument. A return value of `0` typically indicates that the underlying file can no + /// longer accept bytes and likely won't be able to in the future, or that the provided buffer + /// is empty. + /// + /// # Platform-specific behavior + /// + /// - On windows + /// - due to windows does not have syscall like `writev`, so the implement of this function + /// on windows is by internally calling the `WriteFile` syscall to write each buffer into + /// file. + /// + /// # Errors + /// + /// Each `write` call may result in an I/O error, indicating the operation couldn't be + /// completed. If an error occurs, no bytes from the buffer were written to the file. + /// + /// It is **not** considered an error if the entire buffer could not be written to the file. + /// + /// # Example + /// + /// ```no_run + /// use monoio::io::AsyncWriteRent; + /// + /// #[monoio::main] + /// async fn main() -> std::io::Result<()> { + /// let buf_vec = monoio::buf::VecBuf::from(vec![ + /// "Hello".to_owned().into_bytes(), + /// "World".to_owned().into_bytes(), + /// ]); + /// let mut file = monoio::fs::File::create("example.txt").await?; + /// let (res, buf_vec) = file.writev(buf_vec).await; + /// res?; + /// Ok(()) + /// } + /// ``` + async fn writev(&mut self, buf_vec: T) -> crate::BufResult { + self.write_vectored(buf_vec).await + } + + /// Flushes the file, ensuring that all buffered contents are written to their destination. + /// + /// # Platform-specific behavior + /// + /// Since the `File` structure doesn't contain any internal buffers, this function is currently + /// a no-op. + async fn flush(&mut self) -> std::io::Result<()> { + self.flush().await + } + + /// This function will call [`flush`] inside. + async fn shutdown(&mut self) -> std::io::Result<()> { + self.flush().await + } +} + +impl AsyncReadRent for File { + /// Reads bytes from the file at the current file pointer into the specified buffer, returning + /// the number of bytes read. + /// + /// # Return + /// + /// The method returns a tuple with the result of the operation and the same buffer passed as an + /// argument. + /// + /// If the method returns [`(Ok(n), buf)`], a non-zero `n` means the buffer has been filled with + /// `n` bytes of data from the file. If `n` is `0`, it indicates one of the following: + /// + /// 1. The current file pointer is at the end of the file. + /// 2. The provided buffer was 0 bytes in length. + /// + /// It is not an error if `n` is smaller than the buffer size, even if there is enough data in + /// the file to fill the buffer. + /// + /// # Errors + /// + /// If an I/O or other error occurs, an error variant will be returned, and the buffer will also + /// be returned. + /// + /// # Example + /// + /// ```no_run + /// use monoio::io::AsyncReadRent; + /// + /// #[monoio::main] + /// async fn main() -> std::io::Result<()> { + /// let buf = Vec::with_capacity(1024); + /// let mut file = monoio::fs::File::open("example.txt").await?; + /// let (res, buf) = file.read(buf).await; + /// println!("bytes read: {}", res?); + /// Ok(()) + /// } + /// ``` + async fn read(&mut self, buf: T) -> crate::BufResult { + self.read(buf).await + } + + /// Read some bytes at the specified offset from the file into the specified + /// array of buffers, returning how many bytes were read. + /// + /// # Return + /// + /// The method returns the operation result and the same array of buffers + /// passed as an argument. + /// + /// If the method returns [`Ok(n)`], then the read was successful. A nonzero + /// `n` value indicates that the buffers have been filled with `n` bytes of + /// data from the file. If `n` is `0`, then one of the following happened: + /// + /// 1. The specified offset is the end of the file. + /// 2. The buffers specified were 0 bytes in length. + /// + /// It is not an error if the returned value `n` is smaller than the buffer + /// size, even when the file contains enough data to fill the buffer. + /// + /// # Platform-specific behavior + /// + /// - On windows + /// - due to windows does not have syscall like `readv`, so the implement of this function + /// on windows is by internally calling the `ReadFile` syscall to fill each buffer. + /// + /// # Errors + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. The buffer is returned on error. + /// + /// # Example + /// + /// ```no_run + /// use monoio::io::AsyncReadRent; + /// + /// #[monoio::main] + /// async fn main() -> std::io::Result<()> { + /// let mut file = monoio::fs::File::open("example.txt").await?; + /// let buffers = monoio::buf::VecBuf::from(vec![ + /// Vec::::with_capacity(10), + /// Vec::::with_capacity(10), + /// ]); + /// + /// let (res, buffer) = file.readv(buffers).await; + /// + /// println!("bytes read: {}", res?); + /// Ok(()) + /// } + /// ``` + async fn readv(&mut self, buf: T) -> crate::BufResult { + self.read_vectored(buf).await + } +} diff --git a/monoio/src/fs/file/unix.rs b/monoio/src/fs/file/unix.rs new file mode 100644 index 00000000..22ee44e1 --- /dev/null +++ b/monoio/src/fs/file/unix.rs @@ -0,0 +1,81 @@ +use std::{ + fs::File as StdFile, + io, + os::fd::{AsRawFd, IntoRawFd, RawFd}, +}; + +use super::File; +use crate::{ + buf::{IoVecBuf, IoVecBufMut}, + driver::{op::Op, shared_fd::SharedFd}, + fs::{metadata::FileAttr, Metadata}, +}; + +impl File { + /// Converts a [`std::fs::File`] to a [`monoio::fs::File`](File). + /// + /// # Examples + /// + /// ```no_run + /// // This line could block. It is not recommended to do this on the monoio + /// // runtime. + /// let std_file = std::fs::File::open("foo.txt").unwrap(); + /// let file = monoio::fs::File::from_std(std_file); + /// ``` + pub fn from_std(std: StdFile) -> std::io::Result { + Ok(File { + fd: SharedFd::new_without_register(std.into_raw_fd()), + }) + } + + /// Queries metadata about the underlying file. + /// + /// # Examples + /// + /// ```no_run + /// use monoio::fs::File; + /// + /// #[monoio::main] + /// async fn main() -> std::io::Result<()> { + /// let mut f = File::open("foo.txt").await?; + /// let metadata = f.metadata().await?; + /// Ok(()) + /// } + /// ``` + pub async fn metadata(&self) -> io::Result { + metadata(self.fd.clone()).await + } +} + +impl AsRawFd for File { + fn as_raw_fd(&self) -> RawFd { + self.fd.raw_fd() + } +} + +pub(crate) async fn read_vectored( + fd: SharedFd, + buf_vec: T, +) -> crate::BufResult { + let op = Op::readv(fd, buf_vec).unwrap(); + op.result().await +} + +pub(crate) async fn write_vectored( + fd: SharedFd, + buf_vec: T, +) -> crate::BufResult { + let op = Op::writev(fd, buf_vec).unwrap(); + op.result().await +} + +pub(crate) async fn metadata(fd: SharedFd) -> std::io::Result { + #[cfg(target_os = "linux")] + let flags = libc::AT_STATX_SYNC_AS_STAT | libc::AT_EMPTY_PATH; + #[cfg(target_os = "linux")] + let op = Op::statx_using_fd(fd, flags)?; + #[cfg(target_os = "macos")] + let op = Op::statx_using_fd(fd, true)?; + + op.result().await.map(FileAttr::from).map(Metadata) +} diff --git a/monoio/src/fs/file/windows.rs b/monoio/src/fs/file/windows.rs new file mode 100644 index 00000000..02de15b3 --- /dev/null +++ b/monoio/src/fs/file/windows.rs @@ -0,0 +1,126 @@ +use std::{ + mem::ManuallyDrop, + os::windows::io::{AsRawHandle, RawHandle}, +}; + +use windows_sys::Win32::Networking::WinSock::WSABUF; + +use super::File; +use crate::{ + buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, + driver::{op::Op, shared_fd::SharedFd}, +}; + +impl AsRawHandle for File { + fn as_raw_handle(&self) -> RawHandle { + self.fd.raw_handle() + } +} + +pub(crate) async fn read(fd: SharedFd, buf: T) -> crate::BufResult { + let op = Op::read(fd, buf).unwrap(); + op.result().await +} + +/// The `readv` implement on windows. +/// +/// Due to windows does not have syscall like `readv`, so we need to simulate it by ourself. +/// +/// This function is just to fill each buffer by calling the `read` function. +pub(crate) async fn read_vectored( + fd: SharedFd, + mut buf_vec: T, +) -> crate::BufResult { + // Convert the mutable buffer vector into raw pointers that can be used in unsafe operations + let raw_bufs = buf_vec.write_wsabuf_ptr(); + let len = buf_vec.write_wsabuf_len(); + + // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with ManuallyDrop + let wasbufs = ManuallyDrop::new(unsafe { Vec::from_raw_parts(raw_bufs, len, len) }); + + let mut total_bytes_read = 0; + + // Iterate through each WSABUF structure and read data into it + for wsabuf in wasbufs.iter() { + // Safely create a Vec from the WSABUF pointer, then pass it to the read function + let (res, _) = read( + fd.clone(), + ManuallyDrop::new(unsafe { + Vec::from_raw_parts(wsabuf.buf, wsabuf.len as usize, wsabuf.len as usize) + }), + ) + .await; + + // Handle the result of the read operation + match res { + Ok(bytes_read) => { + total_bytes_read += bytes_read; + // If fewer bytes were read than requested, stop further reads + if bytes_read < wsabuf.len as usize { + break; + } + } + Err(e) => { + // If an error occurs, return it along with the original buffer vector + return (Err(e), buf_vec); + } + } + } + + // Due to `read` will init each buffer, so we do need to set buffer len here. + // Return the total bytes read and the buffer vector + (Ok(total_bytes_read), buf_vec) +} + +pub(crate) async fn write(fd: SharedFd, buf: T) -> crate::BufResult { + let op = Op::write(fd, buf).unwrap(); + op.result().await +} + +/// The `writev` implement on windows +/// +/// Due to windows does not have syscall like `writev`, so we need to simulate it by ourself. +/// +/// This function is just to write each buffer into file by calling the `write` function. +pub(crate) async fn write_vectored( + fd: SharedFd, + buf_vec: T, +) -> crate::BufResult { + // Convert the buffer vector into raw pointers that can be used in unsafe operations + let raw_bufs = buf_vec.read_wsabuf_ptr() as *mut WSABUF; + let len = buf_vec.read_wsabuf_len(); + + // Safely wrap the raw pointers into a Vec, but prevent automatic cleanup with ManuallyDrop + let wsabufs = ManuallyDrop::new(unsafe { Vec::from_raw_parts(raw_bufs, len, len) }); + let mut total_bytes_write = 0; + + // Iterate through each WSABUF structure and write data from it + for wsabuf in wsabufs.iter() { + // Safely create a Vec from the WSABUF pointer, then pass it to the write function + let (res, _) = write( + fd.clone(), + ManuallyDrop::new(unsafe { + Vec::from_raw_parts(wsabuf.buf, wsabuf.len as usize, wsabuf.len as usize) + }), + ) + .await; + + // Handle the result of the write operation + match res { + Ok(bytes_write) => { + total_bytes_write += bytes_write; + // If fewer bytes were written than requested, stop further writes + if bytes_write < wsabuf.len as usize { + break; + } + } + Err(e) => { + // If an error occurs, return it along with the original buffer vector + return (Err(e), buf_vec); + } + } + } + + // Return the total bytes written and the buffer vector + (Ok(total_bytes_write), buf_vec) +} diff --git a/monoio/src/fs/metadata/mod.rs b/monoio/src/fs/metadata/mod.rs index 55359bf0..34b100e5 100644 --- a/monoio/src/fs/metadata/mod.rs +++ b/monoio/src/fs/metadata/mod.rs @@ -48,7 +48,7 @@ pub async fn metadata>(path: P) -> std::io::Result { #[cfg(target_os = "macos")] let op = Op::statx_using_path(path, true)?; - op.statx_result().await.map(FileAttr::from).map(Metadata) + op.result().await.map(FileAttr::from).map(Metadata) } /// Query the metadata about a file without following symlinks. @@ -88,7 +88,7 @@ pub async fn symlink_metadata>(path: P) -> std::io::Result>(path: P) -> io::Result> { /// Write a buffer as the entire contents of a file. pub async fn write, C: IoBuf>(path: P, contents: C) -> (io::Result<()>, C) { - let file = match File::create(path).await { - Ok(f) => f, - Err(e) => return (Err(e), contents), - }; - file.write_all_at(contents, 0).await + match File::create(path).await { + Ok(f) => f.write_all_at(contents, 0).await, + Err(e) => (Err(e), contents), + } } /// Removes a file from the filesystem. diff --git a/monoio/src/io/async_read_rent.rs b/monoio/src/io/async_read_rent.rs index e61fd628..72843cff 100644 --- a/monoio/src/io/async_read_rent.rs +++ b/monoio/src/io/async_read_rent.rs @@ -5,11 +5,37 @@ use crate::{ BufResult, }; -/// AsyncReadRent: async read with a ownership of a buffer +/// The `AsyncReadRent` trait defines asynchronous reading operations for objects that +/// implement it. +/// +/// It provides a way to read bytes from a source into a buffer asynchronously, +/// which could be a file, socket, or any other byte-oriented stream. +/// +/// Types that implement this trait are expected to manage asynchronous read operations, +/// allowing them to interact with other asynchronous tasks without blocking the executor. pub trait AsyncReadRent { - /// Same as read(2) + /// Reads bytes from this source into the provided buffer, returning the number of bytes read. + /// + /// # Return + /// + /// When this method returns `(Ok(n), buf)`, it guarantees that `0 <= n <= buf.len()`. A + /// non-zero `n` means the buffer `buf` has been filled with `n` bytes of data from this source. + /// If `n` is `0`, it can indicate one of two possibilities: + /// + /// 1. The reader has likely reached the end of the file and may not produce more bytes, though + /// it is not certain that no more bytes will ever be produced. + /// 2. The provided buffer was 0 bytes in length. + /// + /// # Errors + /// + /// If an I/O or other error occurs, an error variant will be returned, ensuring that no bytes + /// were read. fn read(&mut self, buf: T) -> impl Future>; - /// Same as readv(2) + /// Similar to `read`, but reads data into a slice of buffers. + /// + /// Data is copied sequentially into each buffer, with the last buffer potentially being only + /// partially filled. This method should behave equivalently to a single call to `read` with the + /// buffers concatenated. fn readv(&mut self, buf: T) -> impl Future>; } diff --git a/monoio/src/io/async_write_rent.rs b/monoio/src/io/async_write_rent.rs index c471296d..482f1945 100644 --- a/monoio/src/io/async_write_rent.rs +++ b/monoio/src/io/async_write_rent.rs @@ -5,18 +5,71 @@ use crate::{ BufResult, }; -/// AsyncWriteRent: async write with a ownership of a buffer +/// The `AsyncWriteRent` trait provides asynchronous writing capabilities for structs +/// that implement it. +/// +/// It abstracts over the concept of writing bytes asynchronously +/// to an underlying I/O object, which could be a file, socket, or any other +/// byte-oriented stream. The trait also encompasses the ability to flush buffered +/// data and to shut down the output stream cleanly. +/// +/// Types implementing this trait are required to manage asynchronous I/O operations, +/// allowing for non-blocking writes. This is particularly useful in scenarios where +/// the object might need to interact with other asynchronous tasks without blocking +/// the executor. pub trait AsyncWriteRent { - /// Same as write(2) + /// Writes the contents of a buffer into this writer, returning the number of bytes written. + /// + /// This function attempts to write the entire buffer `buf`, but the write may not fully + /// succeed, and it might also result in an error. A call to `write` represents *at most one* + /// attempt to write to the underlying object. + /// + /// # Return + /// + /// When this method returns `(Ok(n), buf)`, it guarantees that `n <= buf.len()`. A return value + /// of `0` typically indicates that the underlying object can no longer accept bytes and likely + /// won't be able to in the future, or that the provided buffer is empty. + /// + /// # Errors + /// + /// Each `write` call may result in an I/O error, indicating the operation couldn't be + /// completed. If an error occurs, no bytes from the buffer were written to the writer. + /// + /// It is **not** an error if the entire buffer could not be written to this writer. fn write(&mut self, buf: T) -> impl Future>; - /// Same as writev(2) + /// This function attempts to write the entire contents of `buf_vec`, but the write may not + /// fully succeed, and it might also result in an error. The bytes will be written starting at + /// the specified offset. + /// + /// # Return + /// + /// The method returns the result of the operation along with the same array of buffers passed + /// as an argument. A return value of `0` typically indicates that the underlying file can no + /// longer accept bytes and likely won't be able to in the future, or that the provided buffer + /// is empty. + /// + /// # Errors + /// + /// Each `write` call may result in an I/O error, indicating the operation couldn't be + /// completed. If an error occurs, no bytes from the buffer were written to the writer. + /// + /// It is **not** considered an error if the entire buffer could not be written to this writer. fn writev(&mut self, buf_vec: T) -> impl Future>; - /// Flush buffered data if needed + /// Flushes this output stream, ensuring that all buffered content is successfully written to + /// its destination. + /// + /// # Errors + /// + /// An error occurs if not all bytes can be written due to I/O issues or if the end of the file + /// (EOF) is reached. fn flush(&mut self) -> impl Future>; - /// Same as shutdown + /// Shuts down the output stream, ensuring that the value can be cleanly dropped. + /// + /// Similar to [`flush`], all buffered data is written to the underlying stream. After this + /// operation completes, the caller should no longer attempt to write to the stream. fn shutdown(&mut self) -> impl Future>; } diff --git a/monoio/src/net/tcp/stream.rs b/monoio/src/net/tcp/stream.rs index bc01ec78..36eb1ae3 100644 --- a/monoio/src/net/tcp/stream.rs +++ b/monoio/src/net/tcp/stream.rs @@ -321,13 +321,13 @@ impl AsyncWriteRent for TcpStream { fn write(&mut self, buf: T) -> impl Future> { // Submit the write operation let op = Op::send(self.fd.clone(), buf).unwrap(); - op.write() + op.result() } #[inline] fn writev(&mut self, buf_vec: T) -> impl Future> { - let op = Op::writev(&self.fd, buf_vec).unwrap(); - op.write() + let op = Op::writev(self.fd.clone(), buf_vec).unwrap(); + op.result() } #[inline] @@ -366,7 +366,7 @@ impl CancelableAsyncWriteRent for TcpStream { let op = Op::send(fd, buf).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.write().await + op.result().await } #[inline] @@ -381,9 +381,9 @@ impl CancelableAsyncWriteRent for TcpStream { return (Err(operation_canceled()), buf_vec); } - let op = Op::writev(&fd, buf_vec).unwrap(); + let op = Op::writev(fd.clone(), buf_vec).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.write().await + op.result().await } #[inline] @@ -412,14 +412,14 @@ impl AsyncReadRent for TcpStream { fn read(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::recv(self.fd.clone(), buf).unwrap(); - op.read() + op.result() } #[inline] fn readv(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::readv(self.fd.clone(), buf).unwrap(); - op.read() + op.result() } } @@ -438,7 +438,7 @@ impl CancelableAsyncReadRent for TcpStream { let op = Op::recv(fd, buf).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.read().await + op.result().await } #[inline] @@ -455,7 +455,7 @@ impl CancelableAsyncReadRent for TcpStream { let op = Op::readv(fd, buf).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.read().await + op.result().await } } diff --git a/monoio/src/net/udp.rs b/monoio/src/net/udp.rs index 96b8f9d7..62019380 100644 --- a/monoio/src/net/udp.rs +++ b/monoio/src/net/udp.rs @@ -144,7 +144,7 @@ impl UdpSocket { /// which it is connected. On success, returns the number of bytes read. pub async fn recv(&self, buf: T) -> crate::BufResult { let op = Op::recv(self.fd.clone(), buf).unwrap(); - op.read().await + op.result().await } /// Creates new `UdpSocket` from a `std::net::UdpSocket`. @@ -311,6 +311,6 @@ impl UdpSocket { let op = Op::recv(self.fd.clone(), buf).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.read().await + op.result().await } } diff --git a/monoio/src/net/unix/datagram/mod.rs b/monoio/src/net/unix/datagram/mod.rs index c0e27685..f67e3fe8 100644 --- a/monoio/src/net/unix/datagram/mod.rs +++ b/monoio/src/net/unix/datagram/mod.rs @@ -158,7 +158,7 @@ impl UnixDatagram { /// which it is connected. On success, returns the number of bytes read. pub async fn recv(&self, buf: T) -> crate::BufResult { let op = Op::recv(self.fd.clone(), buf).unwrap(); - op.read().await + op.result().await } } diff --git a/monoio/src/net/unix/seq_packet/mod.rs b/monoio/src/net/unix/seq_packet/mod.rs index 74f23669..099ebf53 100644 --- a/monoio/src/net/unix/seq_packet/mod.rs +++ b/monoio/src/net/unix/seq_packet/mod.rs @@ -141,7 +141,7 @@ impl UnixSeqpacket { /// which it is connected. On success, returns the number of bytes read. pub async fn recv(&self, buf: T) -> crate::BufResult { let op = Op::recv(self.fd.clone(), buf).unwrap(); - op.read().await + op.result().await } } diff --git a/monoio/src/net/unix/stream.rs b/monoio/src/net/unix/stream.rs index 74f07ef2..7705b747 100644 --- a/monoio/src/net/unix/stream.rs +++ b/monoio/src/net/unix/stream.rs @@ -177,13 +177,13 @@ impl AsyncWriteRent for UnixStream { fn write(&mut self, buf: T) -> impl Future> { // Submit the write operation let op = Op::send(self.fd.clone(), buf).unwrap(); - op.write() + op.result() } #[inline] fn writev(&mut self, buf_vec: T) -> impl Future> { - let op = Op::writev(&self.fd, buf_vec).unwrap(); - op.write() + let op = Op::writev(self.fd.clone(), buf_vec).unwrap(); + op.result() } #[inline] @@ -220,7 +220,7 @@ impl CancelableAsyncWriteRent for UnixStream { let op = Op::send(fd, buf).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.write().await + op.result().await } #[inline] @@ -235,9 +235,9 @@ impl CancelableAsyncWriteRent for UnixStream { return (Err(operation_canceled()), buf_vec); } - let op = Op::writev(&fd, buf_vec).unwrap(); + let op = Op::writev(fd.clone(), buf_vec).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.write().await + op.result().await } #[inline] @@ -262,14 +262,14 @@ impl AsyncReadRent for UnixStream { fn read(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::recv(self.fd.clone(), buf).unwrap(); - op.read() + op.result() } #[inline] fn readv(&mut self, buf: T) -> impl Future> { // Submit the read operation let op = Op::readv(self.fd.clone(), buf).unwrap(); - op.read() + op.result() } } @@ -288,7 +288,7 @@ impl CancelableAsyncReadRent for UnixStream { let op = Op::recv(fd, buf).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.read().await + op.result().await } #[inline] @@ -305,7 +305,7 @@ impl CancelableAsyncReadRent for UnixStream { let op = Op::readv(fd, buf).unwrap(); let _guard = c.associate_op(op.op_canceller()); - op.read().await + op.result().await } } diff --git a/monoio/tests/fs_file.rs b/monoio/tests/fs_file.rs index 564f6d88..f00ce472 100644 --- a/monoio/tests/fs_file.rs +++ b/monoio/tests/fs_file.rs @@ -4,18 +4,22 @@ use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; #[cfg(windows)] use std::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle as RawFd}; -use monoio::fs::File; +use monoio::{ + buf::VecBuf, + fs::File, + io::{AsyncReadRent, AsyncWriteRent}, +}; use tempfile::NamedTempFile; const HELLO: &[u8] = b"hello world..."; -async fn read_hello(file: &File) { +async fn read_hello(file: &File, offset: u64) { let buf = Vec::with_capacity(1024); - let (res, buf) = file.read_at(buf, 0).await; + let (res, buf) = file.read_at(buf, offset).await; let n = res.unwrap(); - assert!(n > 0 && n <= HELLO.len()); - assert_eq!(&buf, &HELLO[..n]); + assert!(n <= HELLO.len() - offset as usize); + assert_eq!(&buf, &HELLO[offset as usize..n + offset as usize]); } #[monoio::test_all] @@ -24,12 +28,57 @@ async fn basic_read() { tempfile.write_all(HELLO).unwrap(); tempfile.as_file_mut().sync_data().unwrap(); + let mut file = File::open(tempfile.path()).await.unwrap(); + + let (res, buf) = file.read(Vec::with_capacity(HELLO.len() / 2)).await; + assert!(matches!(res, Ok(len) if len == HELLO.len() / 2)); + assert_eq!(buf, HELLO[..res.unwrap()]); + + let (res, buf) = file.read(Vec::with_capacity(HELLO.len() / 2)).await; + assert!(matches!(res, Ok(len) if len == HELLO.len() / 2)); + assert_eq!(buf, HELLO[res.unwrap()..]); +} + +#[monoio::test_all] +async fn read_vectored() { + let mut tempfile = tempfile(); + tempfile.write_all(HELLO).unwrap(); + tempfile.as_file_mut().sync_data().unwrap(); + + let mut file = File::open(tempfile.path()).await.unwrap(); + + let (res, buf) = file + .readv(VecBuf::from(vec![vec![0; HELLO.len() / 7]; 2])) + .await; + + assert!(matches!(res, Ok(len) if len == HELLO.len() / 7 * 2)); + let buf: Vec<_> = Into::>::into(buf).into_iter().flatten().collect(); + assert_eq!(buf, HELLO[..4]); + + let (res, buf) = file + .readv(VecBuf::from(vec![vec![0; HELLO.len() / 7]; 5])) + .await; + + assert!(matches!(res, Ok(len) if len == HELLO.len() / 7 * 5)); + let buf: Vec<_> = Into::>::into(buf).into_iter().flatten().collect(); + assert_eq!(buf, HELLO[4..]); +} + +#[monoio::test_all] +async fn basic_read_at() { + let mut tempfile = tempfile(); + tempfile.write_all(HELLO).unwrap(); + tempfile.as_file_mut().sync_data().unwrap(); + let file = File::open(tempfile.path()).await.unwrap(); - read_hello(&file).await; + + for offset in 0..=HELLO.len() { + read_hello(&file, offset as u64).await; + } } #[monoio::test_all] -async fn basic_read_exact() { +async fn basic_read_exact_at() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); tempfile.as_file_mut().sync_data().unwrap(); @@ -48,17 +97,61 @@ async fn basic_read_exact() { #[monoio::test_all] async fn basic_write() { let tempfile = tempfile(); + let mut file = File::create(tempfile.path()).await.unwrap(); + + let (res, _) = file.write(HELLO).await; + assert!(matches!(res, Ok(14))); + let result = monoio::fs::read(tempfile.path()).await.unwrap(); + assert_eq!(result, HELLO); + + let (res, _) = file.write(HELLO).await; + assert!(matches!(res, Ok(14))); + let result = monoio::fs::read(tempfile.path()).await.unwrap(); + assert_eq!(result, [HELLO, HELLO].concat()); +} + +#[monoio::test_all] +async fn write_vectored() { + let tempfile = tempfile(); + let mut file = File::create(tempfile.path()).await.unwrap(); + + let (res, _) = file.writev(VecBuf::from(vec![HELLO.to_vec(); 2])).await; + assert!(matches!(res, Ok(len) if len == HELLO.len() * 2)); + let result = monoio::fs::read(tempfile.path()).await.unwrap(); + assert_eq!(result, [HELLO, HELLO].concat()); + + let (res, _) = file.writev(VecBuf::from(vec![HELLO.to_vec(); 2])).await; + assert!(matches!(res, Ok(len) if len == HELLO.len() * 2)); + let result = monoio::fs::read(tempfile.path()).await.unwrap(); + assert_eq!(result, [HELLO, HELLO, HELLO, HELLO].concat()); +} + +#[monoio::test_all] +async fn basic_write_at() { + let tempfile = tempfile(); let file = File::create(tempfile.path()).await.unwrap(); file.write_at(HELLO, 0).await.0.unwrap(); file.sync_all().await.unwrap(); - let file = monoio::fs::read(tempfile.path()).await.unwrap(); - assert_eq!(file, HELLO); + let result = monoio::fs::read(tempfile.path()).await.unwrap(); + assert_eq!(result, HELLO); + + // Modify the file pointer. + let mut std_file = std::fs::File::open(tempfile.path()).unwrap(); + std_file.seek(SeekFrom::Start(8)).unwrap(); + + file.write_at(b"monoio...", 6).await.0.unwrap(); + file.sync_all().await.unwrap(); + + assert_eq!( + monoio::fs::read(tempfile.path()).await.unwrap(), + b"hello monoio..." + ) } #[monoio::test_all] -async fn basic_write_all() { +async fn basic_write_all_at() { let tempfile = tempfile(); let file = File::create(tempfile.path()).await.unwrap(); @@ -70,7 +163,7 @@ async fn basic_write_all() { } #[monoio::test(driver = "uring")] -async fn cancel_read() { +async fn cancel_read_at() { let mut tempfile = tempfile(); tempfile.write_all(HELLO).unwrap(); tempfile.as_file_mut().sync_data().unwrap(); @@ -78,9 +171,9 @@ async fn cancel_read() { let file = File::open(tempfile.path()).await.unwrap(); // Poll the future once, then cancel it - poll_once(async { read_hello(&file).await }).await; + poll_once(async { read_hello(&file, 0).await }).await; - read_hello(&file).await; + read_hello(&file, 0).await; } #[monoio::test_all] @@ -201,51 +294,23 @@ async fn file_from_std() { .read(true) .write(true) .create(true) + .truncate(true) .open(tempfile.path()) .unwrap(); let file = File::from_std(std_file).unwrap(); file.write_at(HELLO, 0).await.0.unwrap(); file.sync_all().await.unwrap(); - read_hello(&file).await; -} - -#[monoio::test_all] -async fn position_read() { - let mut tempfile = tempfile(); - tempfile.write_all(HELLO).unwrap(); - tempfile.as_file_mut().sync_data().unwrap(); - - let file = File::open(tempfile.path()).await.unwrap(); - - // Modify the file pointer. - let mut std_file = std::fs::File::open(tempfile.path()).unwrap(); - std_file.seek(SeekFrom::Start(8)).unwrap(); - - let buf = Vec::with_capacity(1024); - let (res, buf) = file.read_at(buf, 4).await; - let n = res.unwrap(); - - assert!(n > 0 && n <= HELLO.len() - 4); - assert_eq!(&buf, &HELLO[4..4 + n]); + read_hello(&file, 0).await; } #[monoio::test_all] -async fn position_write() { +async fn flush_and_shutdown() { let tempfile = tempfile(); + let mut file = File::create(tempfile.path()).await.unwrap(); - let file = File::create(tempfile.path()).await.unwrap(); - file.write_at(HELLO, 0).await.0.unwrap(); - file.sync_all().await.unwrap(); + let res = file.flush().await; + assert!(matches!(res, Ok(()))); - // Modify the file pointer. - let mut std_file = std::fs::File::open(tempfile.path()).unwrap(); - std_file.seek(SeekFrom::Start(8)).unwrap(); - - file.write_at(b"monoio...", 6).await.0.unwrap(); - file.sync_all().await.unwrap(); - - assert_eq!( - monoio::fs::read(tempfile.path()).await.unwrap(), - b"hello monoio..." - ) + let res = file.shutdown().await; + assert!(matches!(res, Ok(()))); }