diff --git a/monoio/src/buf/io_vec_buf.rs b/monoio/src/buf/io_vec_buf.rs index 8302f3b1..82a6433b 100644 --- a/monoio/src/buf/io_vec_buf.rs +++ b/monoio/src/buf/io_vec_buf.rs @@ -61,7 +61,6 @@ unsafe impl IoVecBuf for VecBuf { } #[cfg(unix)] - unsafe impl IoVecBuf for Vec { fn read_iovec_ptr(&self) -> *const libc::iovec { self.as_ptr() diff --git a/monoio/src/buf/vec_wrapper.rs b/monoio/src/buf/vec_wrapper.rs index 70a3ed5a..01feee86 100644 --- a/monoio/src/buf/vec_wrapper.rs +++ b/monoio/src/buf/vec_wrapper.rs @@ -244,7 +244,6 @@ impl<'t, T: IoBufMut> From<&'t mut T> for IoVecMeta { } } -#[cfg(unix)] #[cfg(test)] mod tests { use super::*; @@ -256,9 +255,18 @@ mod tests { let meta = read_vec_meta(&iovec); assert_eq!(meta.len(), 60); assert_eq!(meta.data.len(), 3); - assert_eq!(meta.data[0].iov_len, 10); - assert_eq!(meta.data[1].iov_len, 20); - assert_eq!(meta.data[2].iov_len, 30); + #[cfg(unix)] + { + assert_eq!(meta.data[0].iov_len, 10); + assert_eq!(meta.data[1].iov_len, 20); + assert_eq!(meta.data[2].iov_len, 30); + } + #[cfg(windows)] + { + assert_eq!(meta.data[0].len, 10); + assert_eq!(meta.data[1].len, 20); + assert_eq!(meta.data[2].len, 30); + } } #[test] @@ -267,8 +275,17 @@ mod tests { let meta = write_vec_meta(&mut iovec); assert_eq!(meta.len(), 60); assert_eq!(meta.data.len(), 3); - assert_eq!(meta.data[0].iov_len, 10); - assert_eq!(meta.data[1].iov_len, 20); - assert_eq!(meta.data[2].iov_len, 30); + #[cfg(unix)] + { + assert_eq!(meta.data[0].iov_len, 10); + assert_eq!(meta.data[1].iov_len, 20); + assert_eq!(meta.data[2].iov_len, 30); + } + #[cfg(windows)] + { + assert_eq!(meta.data[0].len, 10); + assert_eq!(meta.data[1].len, 20); + assert_eq!(meta.data[2].len, 30); + } } } diff --git a/monoio/src/driver/op/read.rs b/monoio/src/driver/op/read.rs index 1696a261..eff9d727 100644 --- a/monoio/src/driver/op/read.rs +++ b/monoio/src/driver/op/read.rs @@ -9,7 +9,7 @@ use { std::ffi::c_void, windows_sys::Win32::{ Foundation::TRUE, - Networking::WinSock::{WSAGetLastError, WSARecv, SOCKET_ERROR}, + Networking::WinSock::{WSAGetLastError, WSARecv, WSAESHUTDOWN}, Storage::FileSystem::{ReadFile, SetFilePointer, FILE_CURRENT, INVALID_SET_FILE_POINTER}, }, }; @@ -155,9 +155,7 @@ impl Op> { if let Ok(n) = res { // Safety: the kernel wrote `n` bytes to the buffer. - unsafe { - buf_vec.set_init(n); - } + unsafe { buf_vec.set_init(n) }; } (res, buf_vec) } @@ -188,26 +186,29 @@ impl OpAble for ReadVec { #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { - let mut bytes_recved = 0; + let mut nread = 0; + let mut flags = 0; let ret = unsafe { WSARecv( self.fd.raw_socket() as _, self.buf_vec.write_wsabuf_ptr(), - self.buf_vec.write_wsabuf_len() as _, - &mut bytes_recved, - std::ptr::null_mut(), + self.buf_vec.write_wsabuf_len().min(u32::MAX as usize) as _, + &mut nread, + &mut flags, std::ptr::null_mut(), None, ) }; match ret { - 0 => return Err(std::io::ErrorKind::WouldBlock.into()), - SOCKET_ERROR => { + 0 => Ok(nread), + _ => { let error = unsafe { WSAGetLastError() }; - return Err(std::io::Error::from_raw_os_error(error)); + if error == WSAESHUTDOWN { + Ok(0) + } else { + Err(io::Error::from_raw_os_error(error)) + } } - _ => (), } - Ok(bytes_recved) } } diff --git a/monoio/src/driver/op/recv.rs b/monoio/src/driver/op/recv.rs index 61338335..a995695d 100644 --- a/monoio/src/driver/op/recv.rs +++ b/monoio/src/driver/op/recv.rs @@ -114,7 +114,7 @@ impl OpAble for Recv { recv( fd as _, self.buf.write_ptr(), - self.buf.bytes_total() as _, + self.buf.bytes_total().min(i32::MAX as usize) as _, 0 ), PartialOrd::lt, diff --git a/monoio/src/driver/op/write.rs b/monoio/src/driver/op/write.rs index ec2f2b10..f32295ce 100644 --- a/monoio/src/driver/op/write.rs +++ b/monoio/src/driver/op/write.rs @@ -1,22 +1,22 @@ use std::io; +#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] +use std::os::unix::prelude::AsRawFd; #[cfg(all(target_os = "linux", feature = "iouring"))] use io_uring::{opcode, types}; #[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))] use windows_sys::Win32::{ Foundation::TRUE, - Networking::WinSock::{WSAGetLastError, WSASend, SOCKET_ERROR}, + Networking::WinSock::WSASend, Storage::FileSystem::{SetFilePointer, WriteFile, FILE_CURRENT, INVALID_SET_FILE_POINTER}, }; -#[cfg(all(unix, any(feature = "legacy", feature = "poll-io")))] -use {crate::syscall_u32, std::os::unix::prelude::AsRawFd}; use super::{super::shared_fd::SharedFd, Op, OpAble}; #[cfg(any(feature = "legacy", feature = "poll-io"))] use crate::driver::ready::Direction; use crate::{ buf::{IoBuf, IoVecBuf}, - BufResult, + syscall_u32, BufResult, }; pub(crate) struct Write { @@ -176,25 +176,15 @@ impl OpAble for WriteVec { #[cfg(all(any(feature = "legacy", feature = "poll-io"), windows))] fn legacy_call(&mut self) -> io::Result { let mut bytes_sent = 0; - let ret = unsafe { - WSASend( - self.fd.raw_socket() as _, - self.buf_vec.read_wsabuf_ptr(), - self.buf_vec.read_wsabuf_len() as _, - &mut bytes_sent, - 0, - std::ptr::null_mut(), - None, - ) - }; - match ret { - 0 => return Err(std::io::ErrorKind::WouldBlock.into()), - SOCKET_ERROR => { - let error = unsafe { WSAGetLastError() }; - return Err(std::io::Error::from_raw_os_error(error)); - } - _ => (), - } - Ok(bytes_sent) + syscall_u32!(WSASend( + self.fd.raw_socket() as _, + self.buf_vec.read_wsabuf_ptr(), + self.buf_vec.read_wsabuf_len() as _, + &mut bytes_sent, + 0, + std::ptr::null_mut(), + None, + )) + .map(|_| bytes_sent) } } diff --git a/monoio/src/driver/scheduled_io.rs b/monoio/src/driver/scheduled_io.rs index 5dfdfbd5..d164a1d3 100644 --- a/monoio/src/driver/scheduled_io.rs +++ b/monoio/src/driver/scheduled_io.rs @@ -81,7 +81,7 @@ impl ScheduledIo { match slot { Some(existing) => { if !existing.will_wake(cx.waker()) { - *existing = cx.waker().clone(); + existing.clone_from(cx.waker()); } } None => { diff --git a/monoio/src/driver/shared_fd.rs b/monoio/src/driver/shared_fd.rs index 5f0db14c..0cf550c4 100644 --- a/monoio/src/driver/shared_fd.rs +++ b/monoio/src/driver/shared_fd.rs @@ -495,7 +495,7 @@ impl Inner { } UringState::Waiting(Some(waker)) => { if !waker.will_wake(cx.waker()) { - *waker = cx.waker().clone(); + waker.clone_from(cx.waker()); } Poll::Pending diff --git a/monoio/src/driver/util.rs b/monoio/src/driver/util.rs index 6fa15bc8..20f16fc5 100644 --- a/monoio/src/driver/util.rs +++ b/monoio/src/driver/util.rs @@ -59,6 +59,9 @@ macro_rules! syscall { #[macro_export] macro_rules! syscall_u32 { ($fn: ident ( $($arg: expr),* $(,)* ) ) => {{ + #[cfg(windows)] + let res = unsafe { $fn($($arg, )*) }; + #[cfg(unix)] let res = unsafe { libc::$fn($($arg, )*) }; if res < 0 { Err(std::io::Error::last_os_error()) diff --git a/monoio/tests/tcp_echo.rs b/monoio/tests/tcp_echo.rs index fc819027..d3aca3a1 100644 --- a/monoio/tests/tcp_echo.rs +++ b/monoio/tests/tcp_echo.rs @@ -3,8 +3,6 @@ use monoio::{ net::{TcpListener, TcpStream}, }; -// todo fix these CI in windows -#[cfg(not(windows))] #[monoio::test_all] async fn echo_server() { const ITER: usize = 1024; @@ -58,10 +56,14 @@ async fn echo_server() { let (stream, _) = srv.accept().await.unwrap(); let (mut rd, mut wr) = stream.into_split(); - let n = io::copy(&mut rd, &mut wr).await.unwrap(); - assert_eq!(n, (ITER * (msg.len() + iov_msg.len())) as u64); + // todo fix these CI in windows + #[cfg(not(windows))] + { + let n = io::copy(&mut rd, &mut wr).await.unwrap(); + assert_eq!(n, (ITER * (msg.len() + iov_msg.len())) as u64); - assert!(rx.await.is_ok()); + assert!(rx.await.is_ok()); + } } #[monoio::test_all(timer_enabled = true)]