From a9c4f46a794c1e54a441f726d10ce3a4fb194d73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sat, 15 Mar 2025 13:56:41 +0800 Subject: [PATCH 1/8] fix(io): adjust AsyncReadManaged* traits to match TakeBuffer --- compio-io/src/read/managed.rs | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/compio-io/src/read/managed.rs b/compio-io/src/read/managed.rs index 5c75b191..e5497d08 100644 --- a/compio-io/src/read/managed.rs +++ b/compio-io/src/read/managed.rs @@ -1,6 +1,4 @@ -use std::io::Cursor; - -use compio_buf::IoBuf; +use std::{io::Cursor, ops::Deref}; use crate::IoResult; @@ -11,18 +9,18 @@ pub trait AsyncReadManaged { /// Buffer pool type type BufferPool; /// Filled buffer type - type Buffer: IoBuf; + type Buffer<'a>; /// Read some bytes from this source with [`BufferPool`] and return /// a [`Buffer`]. /// /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, /// if `len` > 0, `min(len, inner buffer size)` will be the read max len - async fn read_managed( + async fn read_managed<'a>( &mut self, - buffer_pool: &Self::BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> IoResult; + ) -> IoResult>; } /// # AsyncReadAtManaged @@ -32,36 +30,39 @@ pub trait AsyncReadManagedAt { /// Buffer pool type type BufferPool; /// Filled buffer type - type Buffer: IoBuf; + type Buffer<'a>; /// Read some bytes from this source at position with [`BufferPool`] and /// return a [`Buffer`]. /// /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, /// if `len` > 0, `min(len, inner buffer size)` will be the read max len - async fn read_managed_at( + async fn read_managed_at<'a>( &self, pos: u64, - buffer_pool: &Self::BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> IoResult; + ) -> IoResult>; } -impl AsyncReadManaged for Cursor { - type Buffer = A::Buffer; +impl AsyncReadManaged for Cursor +where + for<'a> A::Buffer<'a>: Deref, +{ + type Buffer<'a> = A::Buffer<'a>; type BufferPool = A::BufferPool; - async fn read_managed( + async fn read_managed<'a>( &mut self, - buffer_pool: &Self::BufferPool, + buffer_pool: &'a Self::BufferPool, len: usize, - ) -> IoResult { + ) -> IoResult> { let pos = self.position(); let buf = self .get_ref() .read_managed_at(pos, buffer_pool, len) .await?; - self.set_position(pos + buf.buf_len() as u64); + self.set_position(pos + buf.len() as u64); Ok(buf) } } From eddc219d41e646a35a0188c227ca1e6e9b965bcb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sat, 15 Mar 2025 14:24:42 +0800 Subject: [PATCH 2/8] feat(fs): add managed impl --- compio-driver/src/op.rs | 25 ++++++++++++++++++-- compio-fs/src/async_fd.rs | 36 ++++++++++++++++++++++++++--- compio-fs/src/file.rs | 24 ++++++++++++++++--- compio-fs/src/pipe.rs | 36 ++++++++++++++++++++++++++--- compio-fs/src/stdio/unix.rs | 30 ++++++++++++++++++++++-- compio-fs/tests/buffer_pool.rs | 42 ++++++++++++++++++++++++++++++++++ 6 files changed, 180 insertions(+), 13 deletions(-) create mode 100644 compio-fs/tests/buffer_pool.rs diff --git a/compio-driver/src/op.rs b/compio-driver/src/op.rs index 83de834f..ff5f6e72 100644 --- a/compio-driver/src/op.rs +++ b/compio-driver/src/op.rs @@ -4,7 +4,7 @@ //! The operation itself doesn't perform anything. //! You need to pass them to [`crate::Proactor`], and poll the driver. -use std::{marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown}; +use std::{io, marker::PhantomPinned, mem::ManuallyDrop, net::Shutdown}; use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, SetBufInit}; use socket2::SockAddr; @@ -23,7 +23,7 @@ pub use crate::sys::op::{ #[cfg(buf_ring)] pub use crate::sys::op::{ReadManagedAt, RecvManaged}; use crate::{ - OwnedFd, SharedFd, + OwnedFd, SharedFd, TakeBuffer, sys::{sockaddr_storage, socklen_t}, }; @@ -99,6 +99,27 @@ impl RecvResultExt for BufResult; + + /// Take the buffer from result. + fn take_buffer<'a>(self, pool: &'a Self::BufferPool) -> io::Result>; +} + +impl ResultTakeBuffer for (BufResult, u32) { + type Buffer<'a> = T::Buffer<'a>; + type BufferPool = T::BufferPool; + + fn take_buffer<'a>(self, pool: &'a Self::BufferPool) -> io::Result> { + let (BufResult(result, op), flags) = self; + op.take_buffer(pool, result, flags) + } +} + /// Spawn a blocking function in the thread pool. pub struct Asyncify { pub(crate) f: Option, diff --git a/compio-fs/src/async_fd.rs b/compio-fs/src/async_fd.rs index 1eea122a..68fdf0b6 100644 --- a/compio-fs/src/async_fd.rs +++ b/compio-fs/src/async_fd.rs @@ -8,10 +8,10 @@ use std::os::windows::io::{ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; use compio_driver::{ - AsRawFd, SharedFd, ToSharedFd, - op::{BufResultExt, Recv, Send}, + AsRawFd, BorrowedBuffer, BufferPool, SharedFd, ToSharedFd, + op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send}, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; use compio_runtime::Attacher; #[cfg(unix)] use { @@ -59,6 +59,36 @@ impl AsyncRead for AsyncFd { } } +impl AsyncReadManaged for AsyncFd { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &AsyncFd { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvManaged::new(fd, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } +} + impl AsyncRead for &AsyncFd { async fn read(&mut self, buf: B) -> BufResult { let fd = self.inner.to_shared_fd(); diff --git a/compio-fs/src/file.rs b/compio-fs/src/file.rs index a0692f35..7fb13cce 100644 --- a/compio-fs/src/file.rs +++ b/compio-fs/src/file.rs @@ -4,10 +4,10 @@ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; #[cfg(unix)] use compio_driver::op::FileStat; use compio_driver::{ - ToSharedFd, impl_raw_fd, - op::{BufResultExt, CloseFile, ReadAt, Sync, WriteAt}, + BorrowedBuffer, BufferPool, ToSharedFd, impl_raw_fd, + op::{BufResultExt, CloseFile, ReadAt, ReadManagedAt, ResultTakeBuffer, Sync, WriteAt}, }; -use compio_io::{AsyncReadAt, AsyncWriteAt}; +use compio_io::{AsyncReadAt, AsyncReadManagedAt, AsyncWriteAt}; use compio_runtime::Attacher; #[cfg(all(unix, not(solarish)))] use { @@ -169,6 +169,24 @@ impl AsyncReadAt for File { } } +impl AsyncReadManagedAt for File { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed_at<'a>( + &self, + pos: u64, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.inner.to_shared_fd(); + let op = ReadManagedAt::new(fd, pos, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } +} + impl AsyncWriteAt for File { #[inline] async fn write_at(&mut self, buf: T, pos: u64) -> BufResult { diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index af256b0c..9eb707a3 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -9,11 +9,11 @@ use std::{ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::{ - AsRawFd, ToSharedFd, impl_raw_fd, - op::{BufResultExt, Recv, RecvVectored, Send, SendVectored}, + AsRawFd, BorrowedBuffer, BufferPool, ToSharedFd, impl_raw_fd, + op::{BufResultExt, Recv, RecvManaged, RecvVectored, ResultTakeBuffer, Send, SendVectored}, syscall, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; use crate::File; @@ -502,6 +502,36 @@ impl AsyncRead for &Receiver { } } +impl AsyncReadManaged for Receiver { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &Receiver { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let op = RecvManaged::new(fd, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } +} + impl_raw_fd!(Receiver, std::fs::File, file, file); /// Checks if file is a FIFO diff --git a/compio-fs/src/stdio/unix.rs b/compio-fs/src/stdio/unix.rs index faa24262..945f5d98 100644 --- a/compio-fs/src/stdio/unix.rs +++ b/compio-fs/src/stdio/unix.rs @@ -1,8 +1,8 @@ use std::io; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; -use compio_driver::{AsRawFd, RawFd}; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_driver::{AsRawFd, BorrowedBuffer, BufferPool, RawFd}; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; #[cfg(doc)] use super::{stderr, stdin, stdout}; @@ -41,6 +41,32 @@ impl AsyncRead for &Stdin { } } +impl AsyncReadManaged for Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&self.0).read_managed(buffer_pool, len).await + } +} + impl AsRawFd for Stdin { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() diff --git a/compio-fs/tests/buffer_pool.rs b/compio-fs/tests/buffer_pool.rs new file mode 100644 index 00000000..85db30fb --- /dev/null +++ b/compio-fs/tests/buffer_pool.rs @@ -0,0 +1,42 @@ +use std::io::Write; + +use compio_fs::File; +#[cfg(unix)] +use compio_fs::pipe; +use compio_io::AsyncReadManagedAt; +#[cfg(unix)] +use compio_io::{AsyncReadManaged, AsyncWriteExt}; +use compio_runtime::BufferPool; +use tempfile::NamedTempFile; + +const HELLO: &[u8] = b"hello world..."; + +fn tempfile() -> NamedTempFile { + NamedTempFile::new().unwrap() +} + +#[compio_macros::test] +async fn test_read_file() { + let mut tempfile = tempfile(); + tempfile.write_all(HELLO).unwrap(); + + let file = File::open(tempfile.path()).await.unwrap(); + let buffer_pool = BufferPool::new(1, 15).unwrap(); + let buf = file.read_managed_at(&buffer_pool, 0, 0).await.unwrap(); + + assert_eq!(buf.len(), HELLO.len()); + assert_eq!(buf.as_ref(), HELLO); +} + +#[cfg(unix)] +#[compio_macros::test] +async fn test_read_pipe() { + let (mut rx, mut tx) = pipe::anonymous().unwrap(); + tx.write_all(HELLO).await.unwrap(); + + let buffer_pool = BufferPool::new(1, 15).unwrap(); + let buf = rx.read_managed(&buffer_pool, 0).await.unwrap(); + + assert_eq!(buf.len(), HELLO.len()); + assert_eq!(buf.as_ref(), HELLO); +} From 01d337cbd8c0326c6ded678e8889213d51540602 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 16 Mar 2025 14:46:48 +0800 Subject: [PATCH 3/8] feat(runtime): wrap BufferPool Co-authored-by: Sherlock Holo --- compio-driver/src/op.rs | 4 +- compio-fs/src/async_fd.rs | 5 +- compio-fs/src/file.rs | 5 +- compio-fs/tests/buffer_pool.rs | 2 +- compio-runtime/src/lib.rs | 3 +- compio-runtime/src/runtime/buffer_pool.rs | 83 +++++++++++++++++++++++ compio-runtime/src/runtime/mod.rs | 41 ++++++++++- 7 files changed, 134 insertions(+), 9 deletions(-) create mode 100644 compio-runtime/src/runtime/buffer_pool.rs diff --git a/compio-driver/src/op.rs b/compio-driver/src/op.rs index ff5f6e72..8ab6564f 100644 --- a/compio-driver/src/op.rs +++ b/compio-driver/src/op.rs @@ -107,14 +107,14 @@ pub trait ResultTakeBuffer { type Buffer<'a>; /// Take the buffer from result. - fn take_buffer<'a>(self, pool: &'a Self::BufferPool) -> io::Result>; + fn take_buffer(self, pool: &Self::BufferPool) -> io::Result>; } impl ResultTakeBuffer for (BufResult, u32) { type Buffer<'a> = T::Buffer<'a>; type BufferPool = T::BufferPool; - fn take_buffer<'a>(self, pool: &'a Self::BufferPool) -> io::Result> { + fn take_buffer(self, pool: &Self::BufferPool) -> io::Result> { let (BufResult(result, op), flags) = self; op.take_buffer(pool, result, flags) } diff --git a/compio-fs/src/async_fd.rs b/compio-fs/src/async_fd.rs index 68fdf0b6..dd54e4ef 100644 --- a/compio-fs/src/async_fd.rs +++ b/compio-fs/src/async_fd.rs @@ -8,11 +8,11 @@ use std::os::windows::io::{ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; use compio_driver::{ - AsRawFd, BorrowedBuffer, BufferPool, SharedFd, ToSharedFd, + AsRawFd, SharedFd, ToSharedFd, op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send}, }; use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; -use compio_runtime::Attacher; +use compio_runtime::{Attacher, BorrowedBuffer, BufferPool}; #[cfg(unix)] use { compio_buf::{IoVectoredBuf, IoVectoredBufMut}, @@ -82,6 +82,7 @@ impl AsyncReadManaged for &AsyncFd { len: usize, ) -> io::Result> { let fd = self.to_shared_fd(); + let buffer_pool = buffer_pool.as_inner(); let op = RecvManaged::new(fd, buffer_pool, len)?; compio_runtime::submit_with_flags(op) .await diff --git a/compio-fs/src/file.rs b/compio-fs/src/file.rs index 7fb13cce..81ae0bc1 100644 --- a/compio-fs/src/file.rs +++ b/compio-fs/src/file.rs @@ -4,11 +4,11 @@ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; #[cfg(unix)] use compio_driver::op::FileStat; use compio_driver::{ - BorrowedBuffer, BufferPool, ToSharedFd, impl_raw_fd, + ToSharedFd, impl_raw_fd, op::{BufResultExt, CloseFile, ReadAt, ReadManagedAt, ResultTakeBuffer, Sync, WriteAt}, }; use compio_io::{AsyncReadAt, AsyncReadManagedAt, AsyncWriteAt}; -use compio_runtime::Attacher; +use compio_runtime::{Attacher, BorrowedBuffer, BufferPool}; #[cfg(all(unix, not(solarish)))] use { compio_buf::{IoVectoredBuf, IoVectoredBufMut}, @@ -180,6 +180,7 @@ impl AsyncReadManagedAt for File { len: usize, ) -> io::Result> { let fd = self.inner.to_shared_fd(); + let buffer_pool = buffer_pool.as_inner(); let op = ReadManagedAt::new(fd, pos, buffer_pool, len)?; compio_runtime::submit_with_flags(op) .await diff --git a/compio-fs/tests/buffer_pool.rs b/compio-fs/tests/buffer_pool.rs index 85db30fb..0a129c65 100644 --- a/compio-fs/tests/buffer_pool.rs +++ b/compio-fs/tests/buffer_pool.rs @@ -22,7 +22,7 @@ async fn test_read_file() { let file = File::open(tempfile.path()).await.unwrap(); let buffer_pool = BufferPool::new(1, 15).unwrap(); - let buf = file.read_managed_at(&buffer_pool, 0, 0).await.unwrap(); + let buf = file.read_managed_at(0, &buffer_pool, 0).await.unwrap(); assert_eq!(buf.len(), HELLO.len()); assert_eq!(buf.as_ref(), HELLO); diff --git a/compio-runtime/src/lib.rs b/compio-runtime/src/lib.rs index b2d6c13d..7a25b920 100644 --- a/compio-runtime/src/lib.rs +++ b/compio-runtime/src/lib.rs @@ -23,5 +23,6 @@ pub use async_task::Task; pub use attacher::*; use compio_buf::BufResult; pub use runtime::{ - JoinHandle, Runtime, RuntimeBuilder, spawn, spawn_blocking, submit, submit_with_flags, + BorrowedBuffer, BufferPool, JoinHandle, Runtime, RuntimeBuilder, spawn, spawn_blocking, submit, + submit_with_flags, }; diff --git a/compio-runtime/src/runtime/buffer_pool.rs b/compio-runtime/src/runtime/buffer_pool.rs new file mode 100644 index 00000000..9dbe2fa2 --- /dev/null +++ b/compio-runtime/src/runtime/buffer_pool.rs @@ -0,0 +1,83 @@ +//! Buffer pool + +use std::{io, marker::PhantomData, mem::ManuallyDrop}; + +pub use compio_driver::BorrowedBuffer; + +use crate::Runtime; + +/// Buffer pool +/// +/// A buffer pool to allow user no need to specify a specific buffer to do the +/// IO operation +/// +/// Drop the `BufferPool` will release the buffer pool automatically +#[derive(Debug)] +pub struct BufferPool { + inner: ManuallyDrop, + runtime_id: u64, + + // make it !Send and !Sync, to prevent user send the buffer pool to other thread + _marker: PhantomData<*const ()>, +} + +impl BufferPool { + /// Create buffer pool with given `buffer_size` and `buffer_len` + /// + /// # Notes + /// + /// If `buffer_len` is not power of 2, it will be upward with + /// [`u16::next_power_of_two`] + pub fn new(buffer_len: u16, buffer_size: usize) -> io::Result { + let (inner, runtime_id) = Runtime::with_current(|runtime| { + let buffer_pool = runtime.create_buffer_pool(buffer_len, buffer_size)?; + let runtime_id = runtime.id(); + + io::Result::Ok((buffer_pool, runtime_id)) + })?; + + Ok(Self::inner_new(inner, runtime_id)) + } + + fn inner_new(inner: compio_driver::BufferPool, runtime_id: u64) -> Self { + Self { + inner: ManuallyDrop::new(inner), + runtime_id, + _marker: Default::default(), + } + } + + /// Get the inner driver buffer pool reference + /// + /// # Notes + /// + /// You should not use this method unless you are writing your own IO opcode + /// + /// # Panic + /// + /// If call this method in incorrect runtime, will panic + pub fn as_inner(&self) -> &compio_driver::BufferPool { + let current_runtime_id = Runtime::with_current(|runtime| runtime.id()); + assert_eq!(current_runtime_id, self.runtime_id); + + &self.inner + } +} + +impl Drop for BufferPool { + fn drop(&mut self) { + let _ = Runtime::try_with_current(|runtime| { + if self.runtime_id != runtime.id() { + return; + } + + unsafe { + // Safety: we own the inner + let inner = ManuallyDrop::take(&mut self.inner); + + // Safety: the buffer pool is created by current thread runtime + let _ = runtime.release_buffer_pool(inner); + } + }); + } +} diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 968ce506..31d8fb99 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -1,6 +1,6 @@ use std::{ any::Any, - cell::RefCell, + cell::{Cell, RefCell}, collections::VecDeque, future::{Future, ready}, io, @@ -25,6 +25,9 @@ pub(crate) mod op; #[cfg(feature = "time")] pub(crate) mod time; +mod buffer_pool; +pub use buffer_pool::*; + mod send_wrapper; use send_wrapper::SendWrapper; @@ -83,6 +86,10 @@ impl RunnableQueue { } } +thread_local! { + static RUNTIME_ID: Cell = const { Cell::new(0) }; +} + /// The async runtime of compio. It is a thread local runtime, and cannot be /// sent to other threads. pub struct Runtime { @@ -91,6 +98,14 @@ pub struct Runtime { #[cfg(feature = "time")] timer_runtime: RefCell, event_interval: usize, + // Runtime id is used to check if the buffer pool is belonged to this runtime or not. + // Without this, if user enable `io-uring-buf-ring` feature then: + // 1. Create a buffer pool at runtime1 + // 3. Create another runtime2, then use the exists buffer pool in runtime2, it may cause + // - io-uring report error if the buffer group id is not registered + // - buffer pool will return a wrong buffer which the buffer's data is uninit, that will cause + // UB + id: u64, // Other fields don't make it !Send, but actually `local_runnables` implies it should be !Send, // otherwise it won't be valid if the runtime is sent to other threads. _p: PhantomData>>, @@ -108,12 +123,15 @@ impl Runtime { } fn with_builder(builder: &RuntimeBuilder) -> io::Result { + let id = RUNTIME_ID.get(); + RUNTIME_ID.set(id + 1); Ok(Self { driver: RefCell::new(builder.proactor_builder.build()?), runnables: Arc::new(RunnableQueue::new()), #[cfg(feature = "time")] timer_runtime: RefCell::new(TimerRuntime::new()), event_interval: builder.event_interval, + id, _p: PhantomData, }) } @@ -351,6 +369,27 @@ impl Runtime { #[cfg(feature = "time")] self.timer_runtime.borrow_mut().wake(); } + + pub(crate) fn create_buffer_pool( + &self, + buffer_len: u16, + buffer_size: usize, + ) -> io::Result { + self.driver + .borrow_mut() + .create_buffer_pool(buffer_len, buffer_size) + } + + pub(crate) unsafe fn release_buffer_pool( + &self, + buffer_pool: compio_driver::BufferPool, + ) -> io::Result<()> { + self.driver.borrow_mut().release_buffer_pool(buffer_pool) + } + + pub(crate) fn id(&self) -> u64 { + self.id + } } impl Drop for Runtime { From fb55bbeb05c7527521558fc8beb10029485514df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 16 Mar 2025 16:09:47 +0800 Subject: [PATCH 4/8] feat(fs): complete managed io --- compio-driver/src/buffer_pool/fallback.rs | 9 ++-- compio-fs/src/async_fd.rs | 2 +- compio-fs/src/file.rs | 2 +- compio-fs/src/named_pipe.rs | 59 ++++++++++++++++++++++- compio-fs/src/pipe.rs | 4 +- compio-fs/src/stdio/windows.rs | 44 +++++++++++++++-- compio-runtime/src/runtime/buffer_pool.rs | 22 +++------ 7 files changed, 118 insertions(+), 24 deletions(-) diff --git a/compio-driver/src/buffer_pool/fallback.rs b/compio-driver/src/buffer_pool/fallback.rs index e31eed5a..fe959192 100644 --- a/compio-driver/src/buffer_pool/fallback.rs +++ b/compio-driver/src/buffer_pool/fallback.rs @@ -56,7 +56,8 @@ impl BufferPool { } /// Select an [`OwnedBuffer`] when the op creates. - pub(crate) fn get_buffer(&self, len: usize) -> io::Result { + #[doc(hidden)] + pub fn get_buffer(&self, len: usize) -> io::Result { let buffer = self .inner .buffers @@ -78,7 +79,8 @@ impl BufferPool { /// ## Safety /// * `len` should be valid. - pub(crate) unsafe fn create_proxy(&self, mut slice: OwnedBuffer, len: usize) -> BorrowedBuffer { + #[doc(hidden)] + pub unsafe fn create_proxy(&self, mut slice: OwnedBuffer, len: usize) -> BorrowedBuffer { unsafe { slice.set_buf_init(len); } @@ -86,7 +88,8 @@ impl BufferPool { } } -pub(crate) struct OwnedBuffer { +#[doc(hidden)] +pub struct OwnedBuffer { buffer: ManuallyDrop>>, pool: ManuallyDrop>, } diff --git a/compio-fs/src/async_fd.rs b/compio-fs/src/async_fd.rs index dd54e4ef..93b93ca0 100644 --- a/compio-fs/src/async_fd.rs +++ b/compio-fs/src/async_fd.rs @@ -82,7 +82,7 @@ impl AsyncReadManaged for &AsyncFd { len: usize, ) -> io::Result> { let fd = self.to_shared_fd(); - let buffer_pool = buffer_pool.as_inner(); + let buffer_pool = buffer_pool.try_inner()?; let op = RecvManaged::new(fd, buffer_pool, len)?; compio_runtime::submit_with_flags(op) .await diff --git a/compio-fs/src/file.rs b/compio-fs/src/file.rs index 81ae0bc1..7e39af6e 100644 --- a/compio-fs/src/file.rs +++ b/compio-fs/src/file.rs @@ -180,7 +180,7 @@ impl AsyncReadManagedAt for File { len: usize, ) -> io::Result> { let fd = self.inner.to_shared_fd(); - let buffer_pool = buffer_pool.as_inner(); + let buffer_pool = buffer_pool.try_inner()?; let op = ReadManagedAt::new(fd, pos, buffer_pool, len)?; compio_runtime::submit_with_flags(op) .await diff --git a/compio-fs/src/named_pipe.rs b/compio-fs/src/named_pipe.rs index 5a70c226..4f8e087a 100644 --- a/compio-fs/src/named_pipe.rs +++ b/compio-fs/src/named_pipe.rs @@ -8,7 +8,10 @@ use std::{ffi::OsStr, io, os::windows::io::FromRawHandle, ptr::null}; use compio_buf::{BufResult, IoBuf, IoBufMut}; use compio_driver::{AsRawFd, RawFd, ToSharedFd, impl_raw_fd, op::ConnectNamedPipe, syscall}; -use compio_io::{AsyncRead, AsyncReadAt, AsyncWrite, AsyncWriteAt}; +use compio_io::{ + AsyncRead, AsyncReadAt, AsyncReadManaged, AsyncReadManagedAt, AsyncWrite, AsyncWriteAt, +}; +use compio_runtime::{BorrowedBuffer, BufferPool}; use widestring::U16CString; use windows_sys::Win32::{ Storage::FileSystem::{ @@ -192,6 +195,33 @@ impl AsyncRead for &NamedPipeServer { } } +impl AsyncReadManaged for NamedPipeServer { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &NamedPipeServer { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + // The position is ignored. + (&self.handle).read_managed(buffer_pool, len).await + } +} + impl AsyncWrite for NamedPipeServer { #[inline] async fn write(&mut self, buf: T) -> BufResult { @@ -312,6 +342,33 @@ impl AsyncRead for &NamedPipeClient { } } +impl AsyncReadManaged for NamedPipeClient { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &NamedPipeClient { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + // The position is ignored. + self.handle.read_managed_at(0, buffer_pool, len).await + } +} + impl AsyncWrite for NamedPipeClient { #[inline] async fn write(&mut self, buf: T) -> BufResult { diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index 9eb707a3..28d2ac11 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -9,11 +9,12 @@ use std::{ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::{ - AsRawFd, BorrowedBuffer, BufferPool, ToSharedFd, impl_raw_fd, + AsRawFd, ToSharedFd, impl_raw_fd, op::{BufResultExt, Recv, RecvManaged, RecvVectored, ResultTakeBuffer, Send, SendVectored}, syscall, }; use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; +use compio_runtime::{BorrowedBuffer, BufferPool}; use crate::File; @@ -525,6 +526,7 @@ impl AsyncReadManaged for &Receiver { len: usize, ) -> io::Result> { let fd = self.to_shared_fd(); + let buffer_pool = buffer_pool.try_inner()?; let op = RecvManaged::new(fd, buffer_pool, len)?; compio_runtime::submit_with_flags(op) .await diff --git a/compio-fs/src/stdio/windows.rs b/compio-fs/src/stdio/windows.rs index 553f7bed..6b3aacc6 100644 --- a/compio-fs/src/stdio/windows.rs +++ b/compio-fs/src/stdio/windows.rs @@ -9,10 +9,10 @@ use std::{ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; use compio_driver::{ AsRawFd, OpCode, OpType, RawFd, SharedFd, - op::{BufResultExt, Recv, Send}, + op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send}, }; -use compio_io::{AsyncRead, AsyncWrite}; -use compio_runtime::Runtime; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; +use compio_runtime::{BorrowedBuffer, BufferPool, Runtime}; use windows_sys::Win32::System::IO::OVERLAPPED; #[cfg(doc)] @@ -136,6 +136,44 @@ impl AsyncRead for Stdin { } } +impl AsyncReadManaged for Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &Stdin { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let buffer_pool = buffer_pool.try_inner()?; + if self.isatty { + let buf = buffer_pool.get_buffer(len)?; + let op = StdRead::new(io::stdin(), buf); + let BufResult(res, buf) = compio_runtime::submit(op).await.into_inner(); + let res = unsafe { buffer_pool.create_proxy(buf, res?) }; + Ok(res) + } else { + let op = RecvManaged::new(self.fd.clone(), buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } + } +} + impl AsRawFd for Stdin { fn as_raw_fd(&self) -> RawFd { self.fd.as_raw_fd() diff --git a/compio-runtime/src/runtime/buffer_pool.rs b/compio-runtime/src/runtime/buffer_pool.rs index 9dbe2fa2..628e6bf0 100644 --- a/compio-runtime/src/runtime/buffer_pool.rs +++ b/compio-runtime/src/runtime/buffer_pool.rs @@ -36,15 +36,11 @@ impl BufferPool { io::Result::Ok((buffer_pool, runtime_id)) })?; - Ok(Self::inner_new(inner, runtime_id)) - } - - fn inner_new(inner: compio_driver::BufferPool, runtime_id: u64) -> Self { - Self { + Ok(Self { inner: ManuallyDrop::new(inner), runtime_id, _marker: Default::default(), - } + }) } /// Get the inner driver buffer pool reference @@ -52,15 +48,13 @@ impl BufferPool { /// # Notes /// /// You should not use this method unless you are writing your own IO opcode - /// - /// # Panic - /// - /// If call this method in incorrect runtime, will panic - pub fn as_inner(&self) -> &compio_driver::BufferPool { + pub fn try_inner(&self) -> io::Result<&compio_driver::BufferPool> { let current_runtime_id = Runtime::with_current(|runtime| runtime.id()); - assert_eq!(current_runtime_id, self.runtime_id); - - &self.inner + if current_runtime_id == self.runtime_id { + Ok(&self.inner) + } else { + Err(io::Error::other("runtime and buffer pool mismatch")) + } } } From e60f63041af4cb100ee3fdf7917697d1b7e2c300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 16 Mar 2025 16:27:10 +0800 Subject: [PATCH 5/8] feat(net): add managed impl --- compio-net/src/socket.rs | 21 ++++++-- compio-net/src/tcp.rs | 29 ++++++++++- compio-net/src/udp.rs | 14 +++++ compio-net/src/unix.rs | 40 ++++++++++++--- compio-net/tests/buffer_pool.rs | 90 +++++++++++++++++++++++++++++++++ 5 files changed, 181 insertions(+), 13 deletions(-) create mode 100644 compio-net/tests/buffer_pool.rs diff --git a/compio-net/src/socket.rs b/compio-net/src/socket.rs index 768a9bbf..d07a2221 100644 --- a/compio-net/src/socket.rs +++ b/compio-net/src/socket.rs @@ -10,13 +10,13 @@ use compio_driver::op::CreateSocket; use compio_driver::{ AsRawFd, ToSharedFd, impl_raw_fd, op::{ - Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromVectored, RecvMsg, - RecvResultExt, RecvVectored, Send, SendMsg, SendTo, SendToVectored, SendVectored, - ShutdownSocket, + Accept, BufResultExt, CloseSocket, Connect, Recv, RecvFrom, RecvFromVectored, RecvManaged, + RecvMsg, RecvResultExt, RecvVectored, ResultTakeBuffer, Send, SendMsg, SendTo, + SendToVectored, SendVectored, ShutdownSocket, }, syscall, }; -use compio_runtime::Attacher; +use compio_runtime::{Attacher, BorrowedBuffer, BufferPool}; use socket2::{Domain, Protocol, SockAddr, Socket as Socket2, Type}; use crate::PollFd; @@ -229,6 +229,19 @@ impl Socket { compio_runtime::submit(op).await.into_inner().map_advanced() } + pub async fn recv_managed<'a>( + &self, + buffer_pool: &'a BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let buffer_pool = buffer_pool.try_inner()?; + let op = RecvManaged::new(fd, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } + pub async fn send(&self, buffer: T) -> BufResult { let fd = self.to_shared_fd(); let op = Send::new(fd, buffer); diff --git a/compio-net/src/tcp.rs b/compio-net/src/tcp.rs index d13aa489..583639b1 100644 --- a/compio-net/src/tcp.rs +++ b/compio-net/src/tcp.rs @@ -2,7 +2,8 @@ use std::{future::Future, io, net::SocketAddr}; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::impl_raw_fd; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; +use compio_runtime::{BorrowedBuffer, BufferPool}; use socket2::{Protocol, SockAddr, Socket as Socket2, Type}; use crate::{ @@ -276,6 +277,32 @@ impl AsyncRead for &TcpStream { } } +impl AsyncReadManaged for TcpStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &TcpStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + self.inner.recv_managed(buffer_pool, len as _).await + } +} + impl AsyncWrite for TcpStream { #[inline] async fn write(&mut self, buf: T) -> BufResult { diff --git a/compio-net/src/udp.rs b/compio-net/src/udp.rs index 2ca41c79..ad2ca4de 100644 --- a/compio-net/src/udp.rs +++ b/compio-net/src/udp.rs @@ -2,6 +2,7 @@ use std::{future::Future, io, net::SocketAddr}; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::impl_raw_fd; +use compio_runtime::{BorrowedBuffer, BufferPool}; use socket2::{Protocol, SockAddr, Socket as Socket2, Type}; use crate::{Socket, ToSocketAddrsAsync}; @@ -196,6 +197,19 @@ impl UdpSocket { self.inner.recv_vectored(buffer).await } + /// Read some bytes from this source with [`BufferPool`] and return + /// a [`BorrowedBuffer`]. + /// + /// If `len` == 0, will use [`BufferPool`] inner buffer size as the max len, + /// if `len` > 0, `min(len, inner buffer size)` will be the read max len + pub async fn recv_managed<'a>( + &self, + buffer_pool: &'a BufferPool, + len: usize, + ) -> io::Result> { + self.inner.recv_managed(buffer_pool, len).await + } + /// Sends some data to the socket from the buffer, returning the original /// buffer and quantity of data sent. pub async fn send(&self, buffer: T) -> BufResult { diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index 5728c372..6aea948d 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -2,7 +2,8 @@ use std::{future::Future, io, path::Path}; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use compio_driver::impl_raw_fd; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; +use compio_runtime::{BorrowedBuffer, BufferPool}; use socket2::{SockAddr, Socket as Socket2, Type}; use crate::{OwnedReadHalf, OwnedWriteHalf, PollFd, ReadHalf, Socket, WriteHalf}; @@ -239,6 +240,32 @@ impl AsyncRead for &UnixStream { } } +impl AsyncReadManaged for UnixStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + (&*self).read_managed(buffer_pool, len).await + } +} + +impl AsyncReadManaged for &UnixStream { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + self.inner.recv_managed(buffer_pool, len as _).await + } +} + impl AsyncWrite for UnixStream { #[inline] async fn write(&mut self, buf: T) -> BufResult { @@ -294,13 +321,10 @@ fn empty_unix_socket() -> SockAddr { unsafe { SockAddr::try_init(|addr, len| { let addr: *mut SOCKADDR_UN = addr.cast(); - std::ptr::write( - addr, - SOCKADDR_UN { - sun_family: AF_UNIX, - sun_path: [0; 108], - }, - ); + std::ptr::write(addr, SOCKADDR_UN { + sun_family: AF_UNIX, + sun_path: [0; 108], + }); std::ptr::write(len, 3); Ok(()) }) diff --git a/compio-net/tests/buffer_pool.rs b/compio-net/tests/buffer_pool.rs new file mode 100644 index 00000000..1b157805 --- /dev/null +++ b/compio-net/tests/buffer_pool.rs @@ -0,0 +1,90 @@ +use std::net::Ipv6Addr; + +use compio_io::{AsyncReadManaged, AsyncWriteExt}; +use compio_net::{TcpListener, TcpStream, UdpSocket, UnixListener, UnixStream}; +use compio_runtime::BufferPool; + +#[compio_macros::test] +async fn test_tcp_read_buffer_pool() { + let listener = TcpListener::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); + + compio_runtime::spawn(async move { + let mut stream = listener.accept().await.unwrap().0; + stream.write_all(b"test").await.unwrap(); + }) + .detach(); + + let buffer_pool = BufferPool::new(1, 4).unwrap(); + let mut stream = TcpStream::connect(addr).await.unwrap(); + + assert_eq!( + stream.read_managed(&buffer_pool, 0).await.unwrap().as_ref(), + b"test" + ); + + assert!( + stream + .read_managed(&buffer_pool, 0) + .await + .unwrap() + .is_empty() + ); +} + +#[compio_macros::test] +async fn test_udp_read_buffer_pool() { + let listener = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); + let addr = listener.local_addr().unwrap(); + let connected = UdpSocket::bind((Ipv6Addr::LOCALHOST, 0)).await.unwrap(); + connected.connect(addr).await.unwrap(); + let addr = connected.local_addr().unwrap(); + + compio_runtime::spawn(async move { + listener.send_to(b"test", addr).await.unwrap(); + }) + .detach(); + + let buffer_pool = BufferPool::new(1, 4).unwrap(); + + assert_eq!( + connected + .recv_managed(&buffer_pool, 0) + .await + .unwrap() + .as_ref(), + b"test" + ); +} + +#[compio_macros::test] +async fn test_uds_recv_buffer_pool() { + let dir = tempfile::Builder::new() + .prefix("compio-uds-buffer-pool-tests") + .tempdir() + .unwrap(); + let sock_path = dir.path().join("connect.sock"); + + let listener = UnixListener::bind(&sock_path).await.unwrap(); + + let (mut client, (mut server, _)) = + futures_util::try_join!(UnixStream::connect(&sock_path), listener.accept()).unwrap(); + + client.write_all("test").await.unwrap(); + drop(client); + + let buffer_pool = BufferPool::new(1, 4).unwrap(); + + assert_eq!( + server.read_managed(&buffer_pool, 0).await.unwrap().as_ref(), + b"test" + ); + + assert!( + server + .read_managed(&buffer_pool, 0) + .await + .unwrap() + .is_empty() + ); +} From a59dd4d61ee7e14b73b1facb002eba57d55fdd04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 16 Mar 2025 16:31:50 +0800 Subject: [PATCH 6/8] feat(process): add managed impl --- compio-process/src/unix.rs | 41 +++++++++++++++++++++++++++++++++-- compio-process/src/windows.rs | 41 +++++++++++++++++++++++++++++++++-- 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/compio-process/src/unix.rs b/compio-process/src/unix.rs index e867a999..8ac202b3 100644 --- a/compio-process/src/unix.rs +++ b/compio-process/src/unix.rs @@ -3,9 +3,10 @@ use std::{io, panic::resume_unwind, process}; use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; use compio_driver::{ AsRawFd, RawFd, SharedFd, ToSharedFd, - op::{BufResultExt, Recv, Send}, + op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send}, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; +use compio_runtime::{BorrowedBuffer, BufferPool}; use crate::{ChildStderr, ChildStdin, ChildStdout}; @@ -35,6 +36,24 @@ impl AsyncRead for ChildStdout { } } +impl AsyncReadManaged for ChildStdout { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let buffer_pool = buffer_pool.try_inner()?; + let op = RecvManaged::new(fd, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } +} + impl AsRawFd for ChildStderr { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() @@ -55,6 +74,24 @@ impl AsyncRead for ChildStderr { } } +impl AsyncReadManaged for ChildStderr { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let buffer_pool = buffer_pool.try_inner()?; + let op = RecvManaged::new(fd, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } +} + impl AsRawFd for ChildStdin { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() diff --git a/compio-process/src/windows.rs b/compio-process/src/windows.rs index 76df3bba..91a87f62 100644 --- a/compio-process/src/windows.rs +++ b/compio-process/src/windows.rs @@ -9,10 +9,11 @@ use std::{ use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut}; use compio_driver::{ AsRawFd, OpCode, OpType, RawFd, SharedFd, ToSharedFd, - op::{BufResultExt, Recv, Send}, + op::{BufResultExt, Recv, RecvManaged, ResultTakeBuffer, Send}, syscall, }; -use compio_io::{AsyncRead, AsyncWrite}; +use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; +use compio_runtime::{BorrowedBuffer, BufferPool}; use windows_sys::Win32::System::{IO::OVERLAPPED, Threading::GetExitCodeProcess}; use crate::{ChildStderr, ChildStdin, ChildStdout}; @@ -68,6 +69,24 @@ impl AsyncRead for ChildStdout { } } +impl AsyncReadManaged for ChildStdout { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let buffer_pool = buffer_pool.try_inner()?; + let op = RecvManaged::new(fd, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } +} + impl AsRawFd for ChildStderr { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() @@ -88,6 +107,24 @@ impl AsyncRead for ChildStderr { } } +impl AsyncReadManaged for ChildStderr { + type Buffer<'a> = BorrowedBuffer<'a>; + type BufferPool = BufferPool; + + async fn read_managed<'a>( + &mut self, + buffer_pool: &'a Self::BufferPool, + len: usize, + ) -> io::Result> { + let fd = self.to_shared_fd(); + let buffer_pool = buffer_pool.try_inner()?; + let op = RecvManaged::new(fd, buffer_pool, len)?; + compio_runtime::submit_with_flags(op) + .await + .take_buffer(buffer_pool) + } +} + impl AsRawFd for ChildStdin { fn as_raw_fd(&self) -> RawFd { self.0.as_raw_fd() From 3b475ec88118d69f06d850a56ad7d93fa8c6d458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 16 Mar 2025 17:16:51 +0800 Subject: [PATCH 7/8] fix(fs): unix impl --- compio-fs/src/stdio/unix.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/compio-fs/src/stdio/unix.rs b/compio-fs/src/stdio/unix.rs index 945f5d98..180f4663 100644 --- a/compio-fs/src/stdio/unix.rs +++ b/compio-fs/src/stdio/unix.rs @@ -1,8 +1,9 @@ use std::io; use compio_buf::{BufResult, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; -use compio_driver::{AsRawFd, BorrowedBuffer, BufferPool, RawFd}; +use compio_driver::{AsRawFd, RawFd}; use compio_io::{AsyncRead, AsyncReadManaged, AsyncWrite}; +use compio_runtime::{BorrowedBuffer, BufferPool}; #[cfg(doc)] use super::{stderr, stdin, stdout}; From 8a57b79de48761785ce5f6eeb19e78acca5ea160 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Sun, 16 Mar 2025 17:50:05 +0800 Subject: [PATCH 8/8] fix: format with latest cargo-fmt --- compio-driver/src/lib.rs | 3 ++- compio-net/src/unix.rs | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/compio-driver/src/lib.rs b/compio-driver/src/lib.rs index af3313f1..9b7551be 100644 --- a/compio-driver/src/lib.rs +++ b/compio-driver/src/lib.rs @@ -336,7 +336,8 @@ impl Proactor { /// /// # Safety /// - /// Caller must make sure to release the buffer pool with the correct driver, i.e., the one they created the buffer pool with. + /// Caller must make sure to release the buffer pool with the correct + /// driver, i.e., the one they created the buffer pool with. pub unsafe fn release_buffer_pool(&mut self, buffer_pool: BufferPool) -> io::Result<()> { self.driver.release_buffer_pool(buffer_pool) } diff --git a/compio-net/src/unix.rs b/compio-net/src/unix.rs index 6aea948d..04aa9689 100644 --- a/compio-net/src/unix.rs +++ b/compio-net/src/unix.rs @@ -321,10 +321,13 @@ fn empty_unix_socket() -> SockAddr { unsafe { SockAddr::try_init(|addr, len| { let addr: *mut SOCKADDR_UN = addr.cast(); - std::ptr::write(addr, SOCKADDR_UN { - sun_family: AF_UNIX, - sun_path: [0; 108], - }); + std::ptr::write( + addr, + SOCKADDR_UN { + sun_family: AF_UNIX, + sun_path: [0; 108], + }, + ); std::ptr::write(len, 3); Ok(()) })