Skip to content

Commit

Permalink
Add Windows support
Browse files Browse the repository at this point in the history
This commit adds Windows support to this package. Since polling already
has Windows support through IOCP, the main obstacle was adding a ping
event source using IOCP. The hardest part is emulating a pipe using some
shared state and a posted completion packet.

Fixes #160

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Jan 13, 2024
1 parent 67b9308 commit 6202d4c
Show file tree
Hide file tree
Showing 8 changed files with 456 additions and 27 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,35 @@ jobs:
command: test
args: --all-features --manifest-path ./doc/Cargo.toml

ci-windows:
name: CI (Windows)

needs:
- lint

strategy:
fail-fast: false
matrix:
rust: ['1.63.0', 'stable']

runs-on: 'windows-latest'

steps:
- name: Checkout sources
uses: actions/checkout@v4

- name: Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true

- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: --features "block_on executor"

coverage:
name: Coverage

Expand Down
3 changes: 2 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"rust-analyzer.showUnlinkedFileNotification": false
"rust-analyzer.showUnlinkedFileNotification": false,
"rust-analyzer.cargo.target": "x86_64-pc-windows-gnu"
}
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ async-task = { version = "4.4.0", optional = true }
bitflags = "2.4"
futures-io = { version = "0.3.5", optional = true }
log = "0.4"
nix = { version = "0.26", default-features = false, features = ["signal"], optional = true }
pin-utils = { version = "0.1.0", optional = true }
polling = "3.0.0"
rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] }
slab = "0.4.8"
rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] }
thiserror = "1.0.7"

[target.'cfg(unix)'.dependencies]
nix = { version = "0.26", default-features = false, features = ["signal"], optional = true }

[dev-dependencies]
futures = "0.3.5"
rustix = { version = "0.38", default-features = false, features = ["net"] }
Expand Down
58 changes: 49 additions & 9 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
//! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io

use std::cell::RefCell;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll as TaskPoll, Waker};

use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{
AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd,
};

#[cfg(feature = "futures-io")]
use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
Expand All @@ -37,7 +41,6 @@ pub struct Async<'l, F: AsFd> {
fd: Option<F>,
dispatcher: Rc<RefCell<IoDispatcher>>,
inner: Rc<dyn IoLoopInner + 'l>,
old_flags: OFlags,
}

impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> {
Expand All @@ -50,11 +53,19 @@ impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> {
impl<'l, F: AsFd> Async<'l, F> {
pub(crate) fn new<Data>(inner: Rc<LoopInner<'l, Data>>, fd: F) -> crate::Result<Async<'l, F>> {
// set non-blocking
let old_flags = fcntl_getfl(&fd).map_err(std::io::Error::from)?;
fcntl_setfl(&fd, old_flags | OFlags::NONBLOCK).map_err(std::io::Error::from)?;
set_nonblocking(
#[cfg(unix)]
fd.as_fd(),
#[cfg(windows)]
fd.as_socket(),
true,
)?;
// register in the loop
let dispatcher = Rc::new(RefCell::new(IoDispatcher {
#[cfg(unix)]
fd: fd.as_fd().as_raw_fd(),
#[cfg(windows)]
fd: fd.as_socket().as_raw_socket(),
token: None,
waker: None,
is_registered: false,
Expand Down Expand Up @@ -83,7 +94,6 @@ impl<'l, F: AsFd> Async<'l, F> {
fd: Some(fd),
dispatcher,
inner,
old_flags,
})
}

Expand Down Expand Up @@ -165,9 +175,9 @@ impl<'l, F: AsFd> Drop for Async<'l, F> {
fn drop(&mut self) {
self.inner.kill(&self.dispatcher);
// restore flags
let _ = fcntl_setfl(
let _ = set_nonblocking(
unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) },
self.old_flags,
false,
);
}
}
Expand Down Expand Up @@ -358,7 +368,37 @@ impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> {
}
}

#[cfg(all(test, feature = "executor", feature = "futures-io"))]
// https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195
#[inline]
fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<()> {
#[cfg(any(windows, target_os = "linux"))]
{
// ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux
// for now, as with the standard library, because it seems to behave
// differently depending on the platform.
// https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d
// https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80
// https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a
rustix::io::ioctl_fionbio(fd, is_nonblocking)?;
}

#[cfg(not(any(windows, target_os = "linux")))]
{
let previous = rustix::fs::fcntl_getfl(fd)?;
let new = if is_nonblocking {
previous | rustix::fs::OFlags::NONBLOCK
} else {
previous & !(rustix::fs::OFlags::NONBLOCK)
};
if new != previous {
rustix::fs::fcntl_setfl(fd, new)?;
}
}

Ok(())
}

#[cfg(all(test, unix, feature = "executor", feature = "futures-io"))]
mod tests {
use futures::io::{AsyncReadExt, AsyncWriteExt};

Expand Down
34 changes: 30 additions & 4 deletions src/loop_logic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::cell::{Cell, RefCell};
use std::fmt::Debug;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -10,6 +9,11 @@ use std::{io, slice};
#[cfg(feature = "block_on")]
use std::future::Future;

#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};

use log::trace;
use polling::Poller;

Expand Down Expand Up @@ -287,6 +291,7 @@ impl<'l, Data> LoopHandle<'l, Data> {
///
/// This loop can host several event sources, that can be dynamically added or removed.
pub struct EventLoop<'l, Data> {
#[allow(dead_code)]
poller: Arc<Poller>,
handle: LoopHandle<'l, Data>,
signals: Arc<Signals>,
Expand Down Expand Up @@ -664,6 +669,7 @@ impl<'l, Data> EventLoop<'l, Data> {
}
}

#[cfg(unix)]
impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
/// Get the underlying raw-fd of the poller.
///
Expand All @@ -677,6 +683,7 @@ impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
}
}

#[cfg(unix)]
impl<'l, Data> AsFd for EventLoop<'l, Data> {
/// Get the underlying fd of the poller.
///
Expand All @@ -689,6 +696,20 @@ impl<'l, Data> AsFd for EventLoop<'l, Data> {
}
}

#[cfg(windows)]
impl<Data> AsRawHandle for EventLoop<'_, Data> {
fn as_raw_handle(&self) -> RawHandle {
self.poller.as_raw_handle()
}
}

#[cfg(windows)]
impl<Data> AsHandle for EventLoop<'_, Data> {
fn as_handle(&self) -> BorrowedHandle<'_> {
self.poller.as_handle()
}
}

#[derive(Clone, Debug)]
/// The EventIterator is an `Iterator` over the events relevant to a particular source
/// This type is used in the [`EventSource::before_handle_events`] methods for
Expand Down Expand Up @@ -761,12 +782,14 @@ mod tests {

use crate::{
channel::{channel, Channel},
generic::Generic,
ping::*,
Dispatcher, EventIterator, EventSource, Interest, Mode, Poll, PostAction, Readiness,
RegistrationToken, Token, TokenFactory,
EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
TokenFactory,
};

#[cfg(unix)]
use crate::{generic::Generic, Dispatcher, Interest, Mode};

use super::EventLoop;

#[test]
Expand Down Expand Up @@ -1127,6 +1150,7 @@ mod tests {
}
}

#[cfg(unix)]
#[test]
fn insert_bad_source() {
use std::os::unix::io::FromRawFd;
Expand All @@ -1153,6 +1177,7 @@ mod tests {
assert!(ret.is_err());
}

#[cfg(unix)]
#[test]
fn insert_source_no_interest() {
use rustix::pipe::pipe;
Expand Down Expand Up @@ -1310,6 +1335,7 @@ mod tests {
assert_eq!(dispatched, 3);
}

#[cfg(unix)]
#[test]
fn change_interests() {
use rustix::io::write;
Expand Down
46 changes: 37 additions & 9 deletions src/sources/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@
//! [`EventSource`](crate::EventSource) implementation to them.

use polling::Poller;
use std::{
borrow,
marker::PhantomData,
ops,
os::unix::io::{AsFd, AsRawFd, BorrowedFd},
sync::Arc,
use std::{borrow, marker::PhantomData, ops, sync::Arc};

#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd};
#[cfg(windows)]
use std::os::windows::io::{
AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd,
};

use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
Expand Down Expand Up @@ -79,9 +80,15 @@ impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
}

impl<T: AsRawFd> AsFd for FdWrapper<T> {
#[cfg(unix)]
fn as_fd(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
}

#[cfg(windows)]
fn as_socket(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) }
}
}

/// A wrapper around a type that doesn't expose it mutably safely.
Expand Down Expand Up @@ -134,10 +141,17 @@ impl<T> ops::Deref for NoIoDrop<T> {
}

impl<T: AsFd> AsFd for NoIoDrop<T> {
#[cfg(unix)]
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: The innter type is not mutated.
self.0.as_fd()
}

#[cfg(windows)]
fn as_socket(&self) -> BorrowedFd<'_> {
// SAFETY: The innter type is not mutated.
self.0.as_socket()
}
}

/// A generic event source wrapping a FD-backed type
Expand Down Expand Up @@ -199,7 +213,14 @@ impl<F: AsFd, E> Generic<F, E> {

// Remove it from the poller.
if let Some(poller) = self.poller.take() {
poller.delete(file.as_fd()).ok();
poller
.delete(
#[cfg(unix)]
file.as_fd(),
#[cfg(windows)]
file.as_socket(),
)
.ok();

Check warning on line 223 in src/sources/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/generic.rs#L216-L223

Added lines #L216 - L223 were not covered by tests
}

file
Expand All @@ -226,7 +247,14 @@ impl<F: AsFd, E> Drop for Generic<F, E> {
fn drop(&mut self) {
// Remove it from the poller.
if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
poller.delete(file.as_fd()).ok();
poller
.delete(
#[cfg(unix)]
file.as_fd(),
#[cfg(windows)]
file.as_socket(),
)
.ok();
}
}
}
Expand Down Expand Up @@ -307,7 +335,7 @@ where
}
}

#[cfg(test)]
#[cfg(all(unix, test))]
mod tests {
use std::io::{Read, Write};

Expand Down
9 changes: 7 additions & 2 deletions src/sources/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ mod eventfd;
#[cfg(target_os = "linux")]
use eventfd as platform;

#[cfg(not(target_os = "linux"))]
#[cfg(windows)]
mod iocp;
#[cfg(windows)]
use iocp as platform;

#[cfg(not(any(target_os = "linux", windows)))]
mod pipe;
#[cfg(not(target_os = "linux"))]
#[cfg(not(any(target_os = "linux", windows)))]
use pipe as platform;

/// Create a new ping event source
Expand Down
Loading

0 comments on commit 6202d4c

Please sign in to comment.