diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index eb1e6fd3..af0af105 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -39,7 +39,7 @@ once_cell = { version = "1.19.0", optional = true } # windows dependencies(will be added when windows support finished) [target.'cfg(windows)'.dependencies] -windows-sys = { version = "0.48.0", features = [ +windows-sys = { version = "0.52.0", features = [ "Win32_Foundation", "Win32_Networking_WinSock", "Win32_System_IO", @@ -79,7 +79,7 @@ utils = ["nix"] # enable debug if you want to know what runtime does debug = ["tracing"] # enable legacy driver support(will make monoio available for older kernel and macOS) -legacy = ["mio", "polling"] +legacy = ["mio", "polling", "once_cell"] # iouring support iouring = ["io-uring"] # tokio-compatible(only have effect when legacy is enabled and iouring is not) diff --git a/monoio/src/buf/mod.rs b/monoio/src/buf/mod.rs index 570aba43..a30825a5 100644 --- a/monoio/src/buf/mod.rs +++ b/monoio/src/buf/mod.rs @@ -19,7 +19,11 @@ mod raw_buf; pub use raw_buf::{RawBuf, RawBufVectored}; mod vec_wrapper; -pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta}; +#[allow(unused_imports)] +pub(crate) use vec_wrapper::{read_vec_meta, write_vec_meta, IoVecMeta}; + +mod msg; +pub use msg::{MsgBuf, MsgBufMut, MsgMeta}; pub(crate) fn deref(buf: &impl IoBuf) -> &[u8] { // Safety: the `IoBuf` trait is marked as unsafe and is expected to be diff --git a/monoio/src/buf/msg.rs b/monoio/src/buf/msg.rs new file mode 100644 index 00000000..d95ad3c4 --- /dev/null +++ b/monoio/src/buf/msg.rs @@ -0,0 +1,124 @@ +use std::ops::{Deref, DerefMut}; + +#[cfg(unix)] +use libc::msghdr; +#[cfg(windows)] +use windows_sys::Win32::Networking::WinSock::WSAMSG; + +/// An `io_uring` compatible msg buffer. +/// +/// # Safety +/// See the safety note of the methods. +#[allow(clippy::unnecessary_safety_doc)] +pub unsafe trait MsgBuf: Unpin + 'static { + /// Returns a raw pointer to msghdr struct. + /// + /// # Safety + /// The implementation must ensure that, while the runtime owns the value, + /// the pointer returned by `stable_mut_ptr` **does not** change. + /// Also, the value pointed must be a valid msghdr struct. + #[cfg(unix)] + fn read_msghdr_ptr(&self) -> *const msghdr; + + /// Returns a raw pointer to WSAMSG struct. + #[cfg(windows)] + fn read_wsamsg_ptr(&self) -> *const WSAMSG; +} + +/// An `io_uring` compatible msg buffer. +/// +/// # Safety +/// See the safety note of the methods. +#[allow(clippy::unnecessary_safety_doc)] +pub unsafe trait MsgBufMut: Unpin + 'static { + /// Returns a raw pointer to msghdr struct. + /// + /// # Safety + /// The implementation must ensure that, while the runtime owns the value, + /// the pointer returned by `stable_mut_ptr` **does not** change. + /// Also, the value pointed must be a valid msghdr struct. + #[cfg(unix)] + fn write_msghdr_ptr(&mut self) -> *mut msghdr; + + /// Returns a raw pointer to WSAMSG struct. + #[cfg(windows)] + fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG; +} + +#[allow(missing_docs)] +pub struct MsgMeta { + #[cfg(unix)] + pub(crate) data: msghdr, + #[cfg(windows)] + pub(crate) data: WSAMSG, +} + +unsafe impl MsgBuf for MsgMeta { + #[cfg(unix)] + fn read_msghdr_ptr(&self) -> *const msghdr { + &self.data + } + + #[cfg(windows)] + fn read_wsamsg_ptr(&self) -> *const WSAMSG { + &self.data + } +} + +unsafe impl MsgBufMut for MsgMeta { + #[cfg(unix)] + fn write_msghdr_ptr(&mut self) -> *mut msghdr { + &mut self.data + } + + #[cfg(windows)] + fn write_wsamsg_ptr(&mut self) -> *mut WSAMSG { + &mut self.data + } +} + +#[cfg(unix)] +impl From for MsgMeta { + fn from(data: msghdr) -> Self { + Self { data } + } +} + +#[cfg(unix)] +impl Deref for MsgMeta { + type Target = msghdr; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +#[cfg(unix)] +impl DerefMut for MsgMeta { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} + +#[cfg(windows)] +impl From for MsgMeta { + fn from(data: WSAMSG) -> Self { + Self { data } + } +} + +#[cfg(windows)] +impl Deref for MsgMeta { + type Target = WSAMSG; + + fn deref(&self) -> &Self::Target { + &self.data + } +} + +#[cfg(windows)] +impl DerefMut for MsgMeta { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.data + } +} diff --git a/monoio/src/buf/vec_wrapper.rs b/monoio/src/buf/vec_wrapper.rs index 3da704b5..70a3ed5a 100644 --- a/monoio/src/buf/vec_wrapper.rs +++ b/monoio/src/buf/vec_wrapper.rs @@ -1,7 +1,7 @@ #[cfg(windows)] use {std::ops::Add, windows_sys::Win32::Networking::WinSock::WSABUF}; -use super::{IoVecBuf, IoVecBufMut}; +use super::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}; pub(crate) struct IoVecMeta { #[cfg(unix)] @@ -206,6 +206,44 @@ unsafe impl IoVecBufMut for IoVecMeta { } } +impl<'t, T: IoBuf> From<&'t T> for IoVecMeta { + fn from(buf: &'t T) -> Self { + let ptr = buf.read_ptr() as *const _ as *mut _; + let len = buf.bytes_init() as _; + #[cfg(unix)] + let item = libc::iovec { + iov_base: ptr, + iov_len: len, + }; + #[cfg(windows)] + let item = WSABUF { buf: ptr, len }; + Self { + data: vec![item], + offset: 0, + len: 1, + } + } +} + +impl<'t, T: IoBufMut> From<&'t mut T> for IoVecMeta { + fn from(buf: &'t mut T) -> Self { + let ptr = buf.write_ptr() as *mut _; + let len = buf.bytes_total() as _; + #[cfg(unix)] + let item = libc::iovec { + iov_base: ptr, + iov_len: len, + }; + #[cfg(windows)] + let item = WSABUF { buf: ptr, len }; + Self { + data: vec![item], + offset: 0, + len: 1, + } + } +} + #[cfg(unix)] #[cfg(test)] mod tests { diff --git a/monoio/src/driver/legacy/waker.rs b/monoio/src/driver/legacy/waker.rs index 3d9cc79a..d3a374ea 100644 --- a/monoio/src/driver/legacy/waker.rs +++ b/monoio/src/driver/legacy/waker.rs @@ -2,16 +2,20 @@ use mio::Token; use crate::driver::unpark::Unpark; -#[cfg(unix)] pub(crate) struct EventWaker { // raw waker + #[cfg(unix)] waker: mio::Waker, + #[cfg(windows)] + poll: std::sync::Arc, + #[cfg(windows)] + token: Token, // Atomic awake status pub(crate) awake: std::sync::atomic::AtomicBool, } -#[cfg(unix)] impl EventWaker { + #[cfg(unix)] pub(crate) fn new(registry: &mio::Registry, token: Token) -> std::io::Result { Ok(Self { waker: mio::Waker::new(registry, token)?, @@ -19,26 +23,7 @@ impl EventWaker { }) } - pub(crate) fn wake(&self) -> std::io::Result<()> { - // Skip wake if already awake - if self.awake.load(std::sync::atomic::Ordering::Acquire) { - return Ok(()); - } - self.waker.wake() - } -} - -#[cfg(windows)] -pub(crate) struct EventWaker { - // raw waker - poll: std::sync::Arc, - token: Token, - // Atomic awake status - pub(crate) awake: std::sync::atomic::AtomicBool, -} - -#[cfg(windows)] -impl EventWaker { + #[cfg(windows)] pub(crate) fn new( poll: std::sync::Arc, token: Token, @@ -51,14 +36,19 @@ impl EventWaker { } pub(crate) fn wake(&self) -> std::io::Result<()> { - use polling::os::iocp::PollerIocpExt; // Skip wake if already awake if self.awake.load(std::sync::atomic::Ordering::Acquire) { return Ok(()); } - self.poll.post(polling::os::iocp::CompletionPacket::new( + #[cfg(unix)] + let r = self.waker.wake(); + #[cfg(windows)] + use polling::os::iocp::PollerIocpExt; + #[cfg(windows)] + let r = self.poll.post(polling::os::iocp::CompletionPacket::new( polling::Event::readable(self.token.0), - )) + )); + r } } diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index 07e5f25e..866f19a2 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -1,13 +1,32 @@ -use std::{io, net::SocketAddr}; +#[cfg(all(windows, feature = "unstable"))] +use std::sync::LazyLock; +use std::{ + io, + mem::{transmute, MaybeUninit}, + net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, +}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; +#[cfg(all(windows, not(feature = "unstable")))] +use once_cell::sync::Lazy as LazyLock; +#[cfg(windows)] +use windows_sys::{ + core::GUID, + Win32::{ + Networking::WinSock::{ + WSAGetLastError, WSAIoctl, AF_INET, AF_INET6, INVALID_SOCKET, LPFN_WSARECVMSG, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, SIO_GET_EXTENSION_FUNCTION_POINTER, SOCKADDR, + SOCKADDR_IN as sockaddr_in, SOCKADDR_IN6 as sockaddr_in6, + SOCKADDR_STORAGE as sockaddr_storage, SOCKET, SOCKET_ERROR, WSAID_WSARECVMSG, WSAMSG, + }, + System::IO::OVERLAPPED, + }, +}; #[cfg(unix)] use { crate::net::unix::SocketAddr as UnixSocketAddr, - libc::{socklen_t, AF_INET, AF_INET6}, - std::mem::{transmute, MaybeUninit}, - std::net::{Ipv4Addr, Ipv6Addr, SocketAddrV4, SocketAddrV6}, + libc::{sockaddr_in, sockaddr_in6, sockaddr_storage, socklen_t, AF_INET, AF_INET6}, }; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { @@ -20,7 +39,10 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; -use crate::{buf::IoBufMut, BufResult}; +use crate::{ + buf::{IoBufMut, IoVecBufMut, IoVecMeta, MsgMeta}, + BufResult, +}; pub(crate) struct Recv { /// Holds a strong ref to the FD, preventing the file from being closed @@ -112,31 +134,31 @@ pub(crate) struct RecvMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, - #[cfg(unix)] - pub(crate) info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )>, + /// For multiple message recv in the future + pub(crate) info: Box<(MaybeUninit, IoVecMeta, MsgMeta)>, } -#[cfg(unix)] impl Op> { pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.write_ptr() as *mut _, - iov_len: buf.bytes_total(), - }]; - let mut info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; - info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + let mut info: Box<(MaybeUninit, IoVecMeta, MsgMeta)> = + Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { + std::mem::zeroed() + })); + + #[cfg(unix)] + { + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; + info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; + } + #[cfg(windows)] + { + info.2.lpBuffers = info.1.write_wsabuf_ptr(); + info.2.dwBufferCount = info.1.write_wsabuf_len() as _; + info.2.name = &mut info.0 as *mut _ as *mut SOCKADDR; + info.2.namelen = std::mem::size_of::() as _; + } Op::submit_with(RecvMsg { fd, buf, info }) } @@ -150,27 +172,32 @@ impl Op> { let storage = unsafe { complete.data.info.0.assume_init() }; let addr = unsafe { - match storage.ss_family as libc::c_int { + match storage.ss_family as _ { AF_INET => { // Safety: if the ss_family field is AF_INET then storage must be a // sockaddr_in. - let addr: &libc::sockaddr_in = transmute(&storage); + let addr: &sockaddr_in = transmute(&storage); + #[cfg(unix)] let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + #[cfg(windows)] + let ip = Ipv4Addr::from(addr.sin_addr.S_un.S_addr.to_ne_bytes()); let port = u16::from_be(addr.sin_port); SocketAddr::V4(SocketAddrV4::new(ip, port)) } AF_INET6 => { // Safety: if the ss_family field is AF_INET6 then storage must be a // sockaddr_in6. - let addr: &libc::sockaddr_in6 = transmute(&storage); + let addr: &sockaddr_in6 = transmute(&storage); + #[cfg(unix)] let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr); + #[cfg(windows)] + let ip = Ipv6Addr::from(addr.sin6_addr.u.Byte); let port = u16::from_be(addr.sin6_port); - SocketAddr::V6(SocketAddrV6::new( - ip, - port, - addr.sin6_flowinfo, - addr.sin6_scope_id, - )) + #[cfg(unix)] + let scope_id = addr.sin6_scope_id; + #[cfg(windows)] + let scope_id = addr.Anonymous.sin6_scope_id; + SocketAddr::V6(SocketAddrV6::new(ip, port, addr.sin6_flowinfo, scope_id)) } _ => { unreachable!() @@ -179,9 +206,7 @@ impl Op> { }; // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf.set_init(n); - } + unsafe { buf.set_init(n) }; (n, addr) }); @@ -189,22 +214,42 @@ impl Op> { } } +/// see https://github.com/microsoft/windows-rs/issues/2530 #[cfg(windows)] -impl Op> { - #[allow(unused_mut, unused_variables)] - pub(crate) fn recv_msg(fd: SharedFd, mut buf: T) -> io::Result { - unimplemented!() - } - - pub(crate) async fn wait(self) -> BufResult<(usize, SocketAddr), T> { - unimplemented!() +static WSA_RECV_MSG: LazyLock< + unsafe extern "system" fn( + SOCKET, + *mut WSAMSG, + *mut u32, + *mut OVERLAPPED, + LPWSAOVERLAPPED_COMPLETION_ROUTINE, + ) -> i32, +> = LazyLock::new(|| unsafe { + let mut wsa_recv_msg: LPFN_WSARECVMSG = None; + let mut dw_bytes = 0; + let r = WSAIoctl( + INVALID_SOCKET, + SIO_GET_EXTENSION_FUNCTION_POINTER, + &WSAID_WSARECVMSG as *const _ as *const std::ffi::c_void, + std::mem::size_of:: as usize as u32, + &mut wsa_recv_msg as *mut _ as *mut std::ffi::c_void, + std::mem::size_of::() as _, + &mut dw_bytes, + std::ptr::null_mut(), + None, + ); + if r == SOCKET_ERROR || wsa_recv_msg.is_none() { + panic!("{}", io::Error::from_raw_os_error(WSAGetLastError())) + } else { + assert_eq!(dw_bytes, std::mem::size_of::() as _); + wsa_recv_msg.unwrap() } -} +}); impl OpAble for RecvMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { - opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _).build() + opcode::RecvMsg::new(types::Fd(self.fd.raw_fd()), &mut *self.info.2).build() } #[cfg(any(feature = "legacy", feature = "poll-io"))] @@ -216,13 +261,27 @@ impl OpAble for RecvMsg { #[cfg(all(any(feature = "legacy", feature = "poll-io"), unix))] fn legacy_call(&mut self) -> io::Result { let fd = self.fd.as_raw_fd(); - syscall_u32!(recvmsg(fd, &mut self.info.2 as *mut _, 0)) + syscall_u32!(recvmsg(fd, &mut *self.info.2, 0)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let _fd = self.fd.as_raw_socket(); - unimplemented!(); + let fd = self.fd.as_raw_socket(); + let mut recved = 0; + let r = unsafe { + (LazyLock::force(&WSA_RECV_MSG))( + fd as _, + &mut *self.info.2, + &mut recved, + std::ptr::null_mut(), + None, + ) + }; + if r == SOCKET_ERROR { + unsafe { Err(io::Error::from_raw_os_error(WSAGetLastError())) } + } else { + Ok(recved) + } } } @@ -235,30 +294,22 @@ pub(crate) struct RecvMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, - pub(crate) info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )>, + /// For multiple message recv in the future + pub(crate) info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)>, } #[cfg(unix)] impl Op> { pub(crate) fn recv_msg_unix(fd: SharedFd, mut buf: T) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.write_ptr() as *mut _, - iov_len: buf.bytes_total(), - }]; - let mut info: Box<( - MaybeUninit, - [libc::iovec; 1], - libc::msghdr, - )> = Box::new((MaybeUninit::uninit(), iovec, unsafe { std::mem::zeroed() })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; + let mut info: Box<(MaybeUninit, IoVecMeta, libc::msghdr)> = + Box::new((MaybeUninit::uninit(), IoVecMeta::from(&mut buf), unsafe { + std::mem::zeroed() + })); + + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; info.2.msg_name = &mut info.0 as *mut _ as *mut libc::c_void; - info.2.msg_namelen = std::mem::size_of::() as socklen_t; + info.2.msg_namelen = std::mem::size_of::() as socklen_t; Op::submit_with(RecvMsgUnix { fd, buf, info }) } diff --git a/monoio/src/driver/op/send.rs b/monoio/src/driver/op/send.rs index def8ba34..f8e5c285 100644 --- a/monoio/src/driver/op/send.rs +++ b/monoio/src/driver/op/send.rs @@ -2,12 +2,12 @@ use std::{io, net::SocketAddr}; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; -#[cfg(unix)] -use {crate::net::unix::SocketAddr as UnixSocketAddr, socket2::SockAddr}; +use socket2::SockAddr; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use { - crate::syscall, std::os::windows::io::AsRawSocket, - windows_sys::Win32::Networking::WinSock::send, + crate::syscall, + std::os::windows::io::AsRawSocket, + windows_sys::Win32::Networking::WinSock::{send, WSASendMsg, SOCKET_ERROR}, }; #[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; @@ -15,7 +15,12 @@ use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; -use crate::{buf::IoBuf, BufResult}; +#[cfg(unix)] +use crate::net::unix::SocketAddr as UnixSocketAddr; +use crate::{ + buf::{IoBuf, IoVecBufMut, IoVecMeta, MsgMeta}, + BufResult, +}; pub(crate) struct Send { /// Holds a strong ref to the FD, preventing the file from being closed @@ -123,37 +128,50 @@ pub(crate) struct SendMsg { /// Reference to the in-flight buffer. pub(crate) buf: T, - #[cfg(unix)] - pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, + /// For multiple message send in the future + pub(crate) info: Box<(Option, IoVecMeta, MsgMeta)>, } -#[cfg(unix)] impl Op> { pub(crate) fn send_msg( fd: SharedFd, buf: T, socket_addr: Option, ) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.read_ptr() as *const _ as *mut _, - iov_len: buf.bytes_init(), - }]; - let mut info: Box<(Option, [libc::iovec; 1], libc::msghdr)> = - Box::new((socket_addr.map(Into::into), iovec, unsafe { - std::mem::zeroed() - })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; - - match info.0.as_ref() { - Some(socket_addr) => { - info.2.msg_name = socket_addr.as_ptr() as *mut libc::c_void; - info.2.msg_namelen = socket_addr.len(); + let mut info: Box<(Option, IoVecMeta, MsgMeta)> = Box::new(( + socket_addr.map(Into::into), + IoVecMeta::from(&buf), + unsafe { std::mem::zeroed() }, + )); + + #[cfg(unix)] + { + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; + match info.0.as_ref() { + Some(socket_addr) => { + info.2.msg_name = socket_addr.as_ptr() as *mut libc::c_void; + info.2.msg_namelen = socket_addr.len(); + } + None => { + info.2.msg_name = std::ptr::null_mut(); + info.2.msg_namelen = 0; + } } - None => { - info.2.msg_name = std::ptr::null_mut(); - info.2.msg_namelen = 0; + } + #[cfg(windows)] + { + info.2.lpBuffers = info.1.write_wsabuf_ptr(); + info.2.dwBufferCount = info.1.write_wsabuf_len() as _; + match info.0.as_ref() { + Some(socket_addr) => { + info.2.name = socket_addr.as_ptr() as *mut _; + info.2.namelen = socket_addr.len(); + } + None => { + info.2.name = std::ptr::null_mut(); + info.2.namelen = 0; + } } } @@ -168,28 +186,12 @@ impl Op> { } } -#[cfg(windows)] -impl Op> { - #[allow(unused_variables)] - pub(crate) fn send_msg( - fd: SharedFd, - buf: T, - socket_addr: Option, - ) -> io::Result { - unimplemented!() - } - - pub(crate) async fn wait(self) -> BufResult { - unimplemented!() - } -} - impl OpAble for SendMsg { #[cfg(all(target_os = "linux", feature = "iouring"))] fn uring_op(&mut self) -> io_uring::squeue::Entry { #[allow(deprecated)] const FLAGS: u32 = libc::MSG_NOSIGNAL as u32; - opcode::SendMsg::new(types::Fd(self.fd.raw_fd()), &mut self.info.2 as *mut _) + opcode::SendMsg::new(types::Fd(self.fd.raw_fd()), &*self.info.2) .flags(FLAGS) .build() } @@ -210,13 +212,28 @@ impl OpAble for SendMsg { #[cfg(not(target_os = "linux"))] const FLAGS: libc::c_int = 0; let fd = self.fd.as_raw_fd(); - syscall_u32!(sendmsg(fd, &mut self.info.2 as *mut _, FLAGS)) + syscall_u32!(sendmsg(fd, &*self.info.2, FLAGS)) } #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let _fd = self.fd.as_raw_socket(); - unimplemented!(); + let fd = self.fd.as_raw_socket(); + let mut nsent = 0; + let ret = unsafe { + WSASendMsg( + fd as _, + &*self.info.2, + 0, + &mut nsent, + std::ptr::null_mut(), + None, + ) + }; + if ret == SOCKET_ERROR { + Err(io::Error::last_os_error()) + } else { + Ok(nsent) + } } } @@ -229,7 +246,8 @@ pub(crate) struct SendMsgUnix { /// Reference to the in-flight buffer. pub(crate) buf: T, - pub(crate) info: Box<(Option, [libc::iovec; 1], libc::msghdr)>, + /// For multiple message send in the future + pub(crate) info: Box<(Option, IoVecMeta, libc::msghdr)>, } #[cfg(unix)] @@ -239,17 +257,14 @@ impl Op> { buf: T, socket_addr: Option, ) -> io::Result { - let iovec = [libc::iovec { - iov_base: buf.read_ptr() as *const _ as *mut _, - iov_len: buf.bytes_init(), - }]; - let mut info: Box<(Option, [libc::iovec; 1], libc::msghdr)> = - Box::new((socket_addr.map(Into::into), iovec, unsafe { - std::mem::zeroed() - })); - - info.2.msg_iov = info.1.as_mut_ptr(); - info.2.msg_iovlen = 1; + let mut info: Box<(Option, IoVecMeta, libc::msghdr)> = Box::new(( + socket_addr.map(Into::into), + IoVecMeta::from(&buf), + unsafe { std::mem::zeroed() }, + )); + + info.2.msg_iov = info.1.write_iovec_ptr(); + info.2.msg_iovlen = info.1.write_iovec_len() as _; match info.0.as_ref() { Some(socket_addr) => { diff --git a/monoio/src/net/unix/socket_addr.rs b/monoio/src/net/unix/socket_addr.rs index 38db6bb3..3e1aee0e 100644 --- a/monoio/src/net/unix/socket_addr.rs +++ b/monoio/src/net/unix/socket_addr.rs @@ -36,6 +36,7 @@ impl SocketAddr { return AddressKind::Unnamed; } let len = self.socklen as usize - offset; + #[allow(clippy::unnecessary_cast)] let path = unsafe { &*(&self.sockaddr.sun_path as *const [libc::c_char] as *const [u8]) }; // macOS seems to return a len of 16 and a zeroed sun_path for unnamed addresses diff --git a/monoio/src/time/mod.rs b/monoio/src/time/mod.rs index 1d1733c8..008a7133 100644 --- a/monoio/src/time/mod.rs +++ b/monoio/src/time/mod.rs @@ -64,19 +64,13 @@ //! seconds. //! //! ``` -//! #[cfg(windows)] -//! fn main() {} -//! -//! #[cfg(unix)] //! use monoio::time; //! -//! #[cfg(unix)] //! async fn task_that_takes_a_second() { //! println!("hello"); //! time::sleep(time::Duration::from_secs(1)).await //! } //! -//! #[cfg(unix)] //! #[monoio::main(timer_enabled = true)] //! async fn main() { //! let mut interval = time::interval(time::Duration::from_secs(2));