Skip to content

Commit

Permalink
poll-io support windows
Browse files Browse the repository at this point in the history
  • Loading branch information
loongs-zhang committed Feb 28, 2025
1 parent d86d2ce commit 3f77081
Show file tree
Hide file tree
Showing 16 changed files with 125 additions and 36 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ if [ "${NO_RUN}" != "1" ] && [ "${NO_RUN}" != "true" ]; then
export CARGO_NET_RETRY=5
export CARGO_NET_TIMEOUT=10

cargo install cross --git "https://github.com/cross-rs/cross" --rev "7b79041c9278769eca57fae10c74741f5aa5c14b"
cargo install cross --git "https://github.com/cross-rs/cross" --rev "4090beca3cfffa44371a5bba524de3a578aa46c3"
CARGO=cross

cargo clean
Expand Down
2 changes: 1 addition & 1 deletion monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ iouring = ["io-uring"]
# tokio-compatible(only have effect when legacy is enabled and iouring is not)
tokio-compat = ["tokio"]
# (experimental)enable poll-io to convert structs to structs that impl tokio's poll io
poll-io = ["tokio", "mio"]
poll-io = ["mio"]
# signal enables setting ctrl_c handler
signal = ["ctrlc", "sync"]
signal-termination = ["signal", "ctrlc/termination"]
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
23 changes: 9 additions & 14 deletions monoio/src/driver/legacy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ use super::{
};
use crate::utils::slab::Slab;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(windows)]
pub(super) mod iocp;

#[cfg(feature = "sync")]
mod waker;
#[cfg(feature = "sync")]
Expand All @@ -32,9 +28,9 @@ pub(crate) struct LegacyInner {
#[cfg(unix)]
poll: mio::Poll,
#[cfg(windows)]
events: iocp::Events,
events: crate::driver::iocp::Events,
#[cfg(windows)]
poll: iocp::Poller,
poll: crate::driver::iocp::Poller,

#[cfg(feature = "sync")]
shared_waker: std::sync::Arc<waker::EventWaker>,
Expand Down Expand Up @@ -69,18 +65,17 @@ impl LegacyDriver {
#[cfg(unix)]
let poll = mio::Poll::new()?;
#[cfg(windows)]
let poll = iocp::Poller::new()?;
let poll = crate::driver::iocp::Poller::new()?;

#[cfg(all(unix, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(mio::Waker::new(
poll.registry(),
TOKEN_WAKEUP,
)?));
#[cfg(all(windows, feature = "sync"))]
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(iocp::Waker::new(
&poll,
TOKEN_WAKEUP,
)?));
let shared_waker = std::sync::Arc::new(waker::EventWaker::new(
crate::driver::iocp::Waker::new(&poll, TOKEN_WAKEUP)?,
));
#[cfg(feature = "sync")]
let (waker_sender, waker_receiver) = flume::unbounded::<std::task::Waker>();
#[cfg(feature = "sync")]
Expand All @@ -93,7 +88,7 @@ impl LegacyDriver {
#[cfg(unix)]
poll,
#[cfg(windows)]
events: iocp::Events::with_capacity(entries as usize),
events: crate::driver::iocp::Events::with_capacity(entries as usize),
#[cfg(windows)]
poll,
#[cfg(feature = "sync")]
Expand Down Expand Up @@ -178,7 +173,7 @@ impl LegacyDriver {
#[cfg(windows)]
pub(crate) fn register(
this: &Rc<UnsafeCell<LegacyInner>>,
state: &mut iocp::SocketState,
state: &mut crate::driver::iocp::SocketState,
interest: mio::Interest,
) -> io::Result<usize> {
let inner = unsafe { &mut *this.get() };
Expand All @@ -198,7 +193,7 @@ impl LegacyDriver {
pub(crate) fn deregister(
this: &Rc<UnsafeCell<LegacyInner>>,
token: usize,
state: &mut iocp::SocketState,
state: &mut crate::driver::iocp::SocketState,
) -> io::Result<()> {
let inner = unsafe { &mut *this.get() };

Expand Down
4 changes: 2 additions & 2 deletions monoio/src/driver/legacy/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::driver::unpark::Unpark;
pub(crate) struct EventWaker {
// raw waker
#[cfg(windows)]
waker: super::iocp::Waker,
waker: crate::driver::iocp::Waker,
#[cfg(unix)]
waker: mio::Waker,
// Atomic awake status
Expand All @@ -20,7 +20,7 @@ impl EventWaker {
}

#[cfg(windows)]
pub(crate) fn new(waker: super::iocp::Waker) -> Self {
pub(crate) fn new(waker: crate::driver::iocp::Waker) -> Self {
Self {
waker,
awake: std::sync::atomic::AtomicBool::new(true),
Expand Down
6 changes: 5 additions & 1 deletion monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/// Monoio Driver.
#[allow(dead_code)]
pub(crate) mod op;
#[cfg(all(feature = "poll-io", unix))]
#[cfg(feature = "poll-io")]
pub(crate) mod poll;
#[cfg(any(feature = "legacy", feature = "poll-io"))]
pub(crate) mod ready;
Expand All @@ -17,6 +17,10 @@ mod legacy;
#[cfg(all(target_os = "linux", feature = "iouring"))]
mod uring;

#[allow(missing_docs, unreachable_pub, dead_code, unused_imports)]
#[cfg(all(windows, any(feature = "legacy", feature = "poll-io")))]
pub(crate) mod iocp;

mod util;

use std::{
Expand Down
99 changes: 88 additions & 11 deletions monoio/src/driver/poll.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,42 @@
use std::{io, task::Context, time::Duration};
use std::{
io,
ops::{Deref, DerefMut},
task::Context,
time::Duration,
};

#[cfg(unix)]
use mio::{event::Source, Events};
use mio::{Interest, Token};

use super::{op::MaybeFd, ready::Direction, scheduled_io::ScheduledIo};
#[cfg(windows)]
use crate::driver::iocp::{Events, Poller, SocketState};
use crate::{driver::op::CompletionMeta, utils::slab::Slab};

/// Poller with io dispatch.
// TODO: replace legacy impl with this Poll.
pub(crate) struct Poll {
pub(crate) io_dispatch: Slab<ScheduledIo>,
#[cfg(unix)]
poll: mio::Poll,
events: mio::Events,
#[cfg(unix)]
events: Events,
#[cfg(windows)]
poll: Poller,
#[cfg(windows)]
events: Events,
}

impl Poll {
#[inline]
pub(crate) fn with_capacity(capacity: usize) -> io::Result<Self> {
Ok(Self {
io_dispatch: Slab::new(),
#[cfg(unix)]
poll: mio::Poll::new()?,
events: mio::Events::with_capacity(capacity),
#[cfg(windows)]
poll: Poller::new()?,
events: Events::with_capacity(capacity),
})
}

Expand All @@ -41,14 +60,15 @@ impl Poll {
Ok(())
}

#[cfg(unix)]
pub(crate) fn register(
&mut self,
source: &mut impl mio::event::Source,
interest: mio::Interest,
source: &mut impl Source,
interest: Interest,
) -> io::Result<usize> {
let token = self.io_dispatch.insert(ScheduledIo::new());
let registry = self.poll.registry();
match registry.register(source, mio::Token(token), interest) {
match registry.register(source, Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
self.io_dispatch.remove(token);
Expand All @@ -57,11 +77,24 @@ impl Poll {
}
}

pub(crate) fn deregister(
#[cfg(windows)]
pub(crate) fn register(
&mut self,
source: &mut impl mio::event::Source,
token: usize,
) -> io::Result<()> {
source: &mut SocketState,
interest: Interest,
) -> io::Result<usize> {
let token = self.io_dispatch.insert(ScheduledIo::new());
match self.poll.register(source, Token(token), interest) {
Ok(_) => Ok(token),
Err(e) => {
self.io_dispatch.remove(token);
Err(e)
}
}
}

#[cfg(unix)]
pub(crate) fn deregister(&mut self, source: &mut impl Source, token: usize) -> io::Result<()> {
match self.poll.registry().deregister(source) {
Ok(_) => {
self.io_dispatch.remove(token);
Expand All @@ -71,6 +104,18 @@ impl Poll {
}
}

#[cfg(windows)]
pub(crate) fn deregister(&mut self, source: &mut SocketState, token: usize) -> io::Result<()> {
match self.poll.deregister(source) {
Ok(_) => {
self.io_dispatch.remove(token);
Ok(())
}
Err(e) => Err(e),
}
}

#[allow(dead_code)]
#[inline]
pub(crate) fn poll_syscall(
&mut self,
Expand Down Expand Up @@ -107,3 +152,35 @@ impl std::os::fd::AsRawFd for Poll {
self.poll.as_raw_fd()
}
}

#[cfg(unix)]
impl Deref for Poll {
type Target = mio::Poll;

fn deref(&self) -> &Self::Target {
&self.poll
}
}

#[cfg(windows)]
impl std::os::windows::io::AsRawHandle for Poll {
#[inline]
fn as_raw_handle(&self) -> std::os::windows::io::RawHandle {
self.poll.as_raw_handle()
}
}

#[cfg(windows)]
impl Deref for Poll {
type Target = Poller;

fn deref(&self) -> &Self::Target {
&self.poll
}
}

impl DerefMut for Poll {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.poll
}
}
2 changes: 1 addition & 1 deletion monoio/src/driver/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Ready {
pub(crate) const WRITE_ALL: Ready = Ready(WRITABLE | WRITE_CLOSED | WRITE_CANCELED);

#[cfg(windows)]
pub(crate) fn from_mio(event: &super::legacy::iocp::Event) -> Ready {
pub(crate) fn from_mio(event: &crate::driver::iocp::Event) -> Ready {
let mut ready = Ready::EMPTY;

if event.is_readable() {
Expand Down
4 changes: 2 additions & 2 deletions monoio/src/driver/shared_fd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use std::os::windows::io::{
};
use std::{cell::UnsafeCell, io, rc::Rc};

#[cfg(windows)]
use super::legacy::iocp::SocketState as RawFd;
use super::CURRENT;
#[cfg(windows)]
use crate::driver::iocp::SocketState as RawFd;

// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
Expand Down
14 changes: 14 additions & 0 deletions monoio/src/utils/ctrlc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,18 @@ impl CtrlC {
_private: PhantomData,
})
}

/// Ctrl+C to current progress.
pub fn ctrlc() {
let pid = std::process::id() as _;
unsafe {
#[cfg(unix)]
libc::kill(pid, libc::SIGINT);
#[cfg(windows)]
windows_sys::Win32::System::Console::GenerateConsoleCtrlEvent(
windows_sys::Win32::System::Console::CTRL_C_EVENT,
pid,
);
};
}
}
5 changes: 2 additions & 3 deletions monoio/tests/ctrlc_legacy.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#[cfg(feature = "signal")]
#[monoio::test(driver = "legacy")]
async fn test_ctrlc_legacy() {
use libc::{getpid, kill, SIGINT};
use monoio::utils::CtrlC;

let c = CtrlC::new().unwrap();
std::thread::spawn(|| unsafe {
std::thread::spawn(|| {
std::thread::sleep(std::time::Duration::from_millis(500));
kill(getpid(), SIGINT);
CtrlC::ctrlc();
});

c.await;
Expand Down

0 comments on commit 3f77081

Please sign in to comment.