Skip to content

Commit

Permalink
impl recv and send
Browse files Browse the repository at this point in the history
  • Loading branch information
loongs-zhang committed Feb 22, 2024
1 parent 4c2daf6 commit f1182e6
Show file tree
Hide file tree
Showing 9 changed files with 380 additions and 163 deletions.
4 changes: 2 additions & 2 deletions monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ once_cell = { version = "1.19.0", optional = true }

# windows dependencies(will be added when windows support finished)
[target.'cfg(windows)'.dependencies]
windows-sys = { version = "0.48.0", features = [
windows-sys = { version = "0.52.0", features = [
"Win32_Foundation",
"Win32_Networking_WinSock",
"Win32_System_IO",
Expand Down Expand Up @@ -79,7 +79,7 @@ utils = ["nix"]
# enable debug if you want to know what runtime does
debug = ["tracing"]
# enable legacy driver support(will make monoio available for older kernel and macOS)
legacy = ["mio", "polling"]
legacy = ["mio", "polling", "once_cell"]
# iouring support
iouring = ["io-uring"]
# tokio-compatible(only have effect when legacy is enabled and iouring is not)
Expand Down
6 changes: 5 additions & 1 deletion monoio/src/buf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ mod raw_buf;
pub use raw_buf::{RawBuf, RawBufVectored};

mod vec_wrapper;
pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta};
#[allow(unused_imports)]
pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta, IoVecMeta};

mod msg;
pub use msg::{MsgBuf, MsgBufMut, MsgMeta};

pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] {
// Safety: the `IoBuf` trait is marked as unsafe and is expected to be
Expand Down
124 changes: 124 additions & 0 deletions monoio/src/buf/msg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use std::ops::{Deref, DerefMut};

#[cfg(unix)]
use libc::msghdr;
#[cfg(windows)]
use windows_sys::Win32::Networking::WinSock::WSAMSG;

/// An `io_uring` compatible msg buffer.
///
/// # Safety
/// See the safety note of the methods.
#[allow(clippy::unnecessary_safety_doc)]
pub unsafe trait MsgBuf: Unpin + 'static {
/// Returns a raw pointer to msghdr struct.
///
/// # Safety
/// The implementation must ensure that, while the runtime owns the value,
/// the pointer returned by `stable_mut_ptr` **does not** change.
/// Also, the value pointed must be a valid msghdr struct.
#[cfg(unix)]
fn read_msghdr_ptr(&self) -> *const msghdr;

/// Returns a raw pointer to WSAMSG struct.
#[cfg(windows)]
fn read_wsamsg_ptr(&self) -> *const WSAMSG;
}

/// An `io_uring` compatible msg buffer.
///
/// # Safety
/// See the safety note of the methods.
#[allow(clippy::unnecessary_safety_doc)]
pub unsafe trait MsgBufMut: Unpin + 'static {
/// Returns a raw pointer to msghdr struct.
///
/// # Safety
/// The implementation must ensure that, while the runtime owns the value,
/// the pointer returned by `stable_mut_ptr` **does not** change.
/// Also, the value pointed must be a valid msghdr struct.
#[cfg(unix)]
fn write_msghdr_ptr(&mut self) -> *mut msghdr;

/// Returns a raw pointer to WSAMSG struct.
#[cfg(windows)]
fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG;
}

#[allow(missing_docs)]
pub struct MsgMeta {
#[cfg(unix)]
pub(crate) data: msghdr,
#[cfg(windows)]
pub(crate) data: WSAMSG,
}

unsafe impl MsgBuf for MsgMeta {
#[cfg(unix)]
fn read_msghdr_ptr(&self) -> *const msghdr {
&self.data
}

#[cfg(windows)]
fn read_wsamsg_ptr(&self) -> *const WSAMSG {
&self.data
}
}

unsafe impl MsgBufMut for MsgMeta {
#[cfg(unix)]
fn write_msghdr_ptr(&mut self) -> *mut msghdr {
&mut self.data
}

#[cfg(windows)]
fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG {
&mut self.data
}
}

#[cfg(unix)]
impl From<msghdr> for MsgMeta {
fn from(data: msghdr) -> Self {
Self { data }
}
}

#[cfg(unix)]
impl Deref for MsgMeta {
type Target = msghdr;

fn deref(&self) -> &Self::Target {
&self.data
}
}

#[cfg(unix)]
impl DerefMut for MsgMeta {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}

#[cfg(windows)]
impl From<WSAMSG> for MsgMeta {
fn from(data: WSAMSG) -> Self {
Self { data }
}
}

#[cfg(windows)]
impl Deref for MsgMeta {
type Target = WSAMSG;

fn deref(&self) -> &Self::Target {
&self.data
}
}

#[cfg(windows)]
impl DerefMut for MsgMeta {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.data
}
}
40 changes: 39 additions & 1 deletion monoio/src/buf/vec_wrapper.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(windows)]
use {std::ops::Add, windows_sys::Win32::Networking::WinSock::WSABUF};

use super::{IoVecBuf, IoVecBufMut};
use super::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut};

pub(crate) struct IoVecMeta {
#[cfg(unix)]
Expand Down Expand Up @@ -206,6 +206,44 @@ unsafe impl IoVecBufMut for IoVecMeta {
}
}

impl<'t, T: IoBuf> From<&'t T> for IoVecMeta {
fn from(buf: &'t T) -> Self {
let ptr = buf.read_ptr() as *const _ as *mut _;
let len = buf.bytes_init() as _;
#[cfg(unix)]
let item = libc::iovec {
iov_base: ptr,
iov_len: len,
};
#[cfg(windows)]
let item = WSABUF { buf: ptr, len };
Self {
data: vec![item],
offset: 0,
len: 1,
}
}
}

impl<'t, T: IoBufMut> From<&'t mut T> for IoVecMeta {
fn from(buf: &'t mut T) -> Self {
let ptr = buf.write_ptr() as *mut _;
let len = buf.bytes_total() as _;
#[cfg(unix)]
let item = libc::iovec {
iov_base: ptr,
iov_len: len,
};
#[cfg(windows)]
let item = WSABUF { buf: ptr, len };
Self {
data: vec![item],
offset: 0,
len: 1,
}
}
}

#[cfg(unix)]
#[cfg(test)]
mod tests {
Expand Down
40 changes: 15 additions & 25 deletions monoio/src/driver/legacy/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,28 @@ use mio::Token;

use crate::driver::unpark::Unpark;

#[cfg(unix)]
pub(crate) struct EventWaker {
// raw waker
#[cfg(unix)]
waker: mio::Waker,
#[cfg(windows)]
poll: std::sync::Arc<polling::Poller>,
#[cfg(windows)]
token: Token,
// Atomic awake status
pub(crate) awake: std::sync::atomic::AtomicBool,
}

#[cfg(unix)]
impl EventWaker {
#[cfg(unix)]
pub(crate) fn new(registry: &mio::Registry, token: Token) -> std::io::Result<Self> {
Ok(Self {
waker: mio::Waker::new(registry, token)?,
awake: std::sync::atomic::AtomicBool::new(true),
})
}

pub(crate) fn wake(&self) -> std::io::Result<()> {
// Skip wake if already awake
if self.awake.load(std::sync::atomic::Ordering::Acquire) {
return Ok(());
}
self.waker.wake()
}
}

#[cfg(windows)]
pub(crate) struct EventWaker {
// raw waker
poll: std::sync::Arc<polling::Poller>,
token: Token,
// Atomic awake status
pub(crate) awake: std::sync::atomic::AtomicBool,
}

#[cfg(windows)]
impl EventWaker {
#[cfg(windows)]
pub(crate) fn new(
poll: std::sync::Arc<polling::Poller>,
token: Token,
Expand All @@ -51,14 +36,19 @@ impl EventWaker {
}

pub(crate) fn wake(&self) -> std::io::Result<()> {
use polling::os::iocp::PollerIocpExt;
// Skip wake if already awake
if self.awake.load(std::sync::atomic::Ordering::Acquire) {
return Ok(());
}
self.poll.post(polling::os::iocp::CompletionPacket::new(
#[cfg(unix)]
let r = self.waker.wake();
#[cfg(windows)]
use polling::os::iocp::PollerIocpExt;
#[cfg(windows)]
let r = self.poll.post(polling::os::iocp::CompletionPacket::new(
polling::Event::readable(self.token.0),
))
));
r
}
}

Expand Down
Loading

0 comments on commit f1182e6

Please sign in to comment.