Skip to content

Commit

Permalink
feat: Add Windows support
Browse files Browse the repository at this point in the history
This commit adds Windows support to this package. Since polling already
has Windows support through IOCP, the main obstacle was adding a ping
event source using IOCP. The hardest part is emulating a pipe using some
shared state and a posted completion packet.

Fixes #160

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull committed Jan 15, 2024
1 parent a2a4f91 commit 7789de0
Show file tree
Hide file tree
Showing 7 changed files with 498 additions and 27 deletions.
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,35 @@ jobs:
command: test
args: --all-features --manifest-path ./doc/Cargo.toml

ci-windows:
name: CI (Windows)

needs:
- lint

strategy:
fail-fast: false
matrix:
rust: ['1.63.0', 'stable']

runs-on: 'windows-latest'

steps:
- name: Checkout sources
uses: actions/checkout@v4

- name: Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true

- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: --features "block_on executor"

coverage:
name: Coverage

Expand Down
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ async-task = { version = "4.4.0", optional = true }
bitflags = "2.4"
futures-io = { version = "0.3.5", optional = true }
log = "0.4"
nix = { version = "0.26", default-features = false, features = ["signal"], optional = true }
pin-utils = { version = "0.1.0", optional = true }
polling = "3.0.0"
rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] }
slab = "0.4.8"
rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] }
thiserror = "1.0.7"

[target.'cfg(unix)'.dependencies]
nix = { version = "0.26", default-features = false, features = ["signal"], optional = true }

[dev-dependencies]
futures = "0.3.5"
rustix = { version = "0.38", default-features = false, features = ["net"] }
Expand Down
67 changes: 58 additions & 9 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
//! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io

use std::cell::RefCell;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll as TaskPoll, Waker};

use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{
AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd,
};

#[cfg(feature = "futures-io")]
use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
Expand All @@ -33,11 +37,18 @@ use crate::{AdditionalLifecycleEventsSet, RegistrationToken};
/// `AsyncWrite` if the underlying type implements `Read` and/or `Write`.
///
/// Note that this adapter and the futures procuded from it and *not* threadsafe.
///
/// ## Platform-Specific
///
/// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status.
/// For example, if the file was previously nonblocking it will be set to nonblocking, and
/// if the file was blocking it will be set to blocking. However, on Windows, it is impossible
/// to tell what its status was before. Therefore it will always be set to blocking.
pub struct Async<'l, F: AsFd> {
fd: Option<F>,
dispatcher: Rc<RefCell<IoDispatcher>>,
inner: Rc<dyn IoLoopInner + 'l>,
old_flags: OFlags,
was_nonblocking: bool,
}

impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> {
Expand All @@ -50,11 +61,19 @@ impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> {
impl<'l, F: AsFd> Async<'l, F> {
pub(crate) fn new<Data>(inner: Rc<LoopInner<'l, Data>>, fd: F) -> crate::Result<Async<'l, F>> {
// set non-blocking
let old_flags = fcntl_getfl(&fd).map_err(std::io::Error::from)?;
fcntl_setfl(&fd, old_flags | OFlags::NONBLOCK).map_err(std::io::Error::from)?;
let was_nonblocking = set_nonblocking(
#[cfg(unix)]
fd.as_fd(),
#[cfg(windows)]
fd.as_socket(),
true,
)?;
// register in the loop
let dispatcher = Rc::new(RefCell::new(IoDispatcher {
#[cfg(unix)]
fd: fd.as_fd().as_raw_fd(),
#[cfg(windows)]
fd: fd.as_socket().as_raw_socket(),
token: None,
waker: None,
is_registered: false,
Expand Down Expand Up @@ -83,7 +102,7 @@ impl<'l, F: AsFd> Async<'l, F> {
fd: Some(fd),
dispatcher,
inner,
old_flags,
was_nonblocking,
})
}

Expand Down Expand Up @@ -165,9 +184,9 @@ impl<'l, F: AsFd> Drop for Async<'l, F> {
fn drop(&mut self) {
self.inner.kill(&self.dispatcher);
// restore flags
let _ = fcntl_setfl(
let _ = set_nonblocking(
unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) },
self.old_flags,
self.was_nonblocking,
);
}
}
Expand Down Expand Up @@ -358,7 +377,37 @@ impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> {
}
}

#[cfg(all(test, feature = "executor", feature = "futures-io"))]
// https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195
/// Set the nonblocking status of an FD and return whether it was nonblocking before.
#[allow(clippy::needless_return)]
#[inline]
fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<bool> {
#[cfg(windows)]
{
rustix::io::ioctl_fionbio(fd, is_nonblocking)?;

// Unfortunately it is impossible to tell if a socket was nonblocking on Windows.
// Just say it wasn't for now.
return Ok(false);
}

#[cfg(not(windows))]
{
let previous = rustix::fs::fcntl_getfl(fd)?;
let new = if is_nonblocking {
previous | rustix::fs::OFlags::NONBLOCK
} else {
previous & !(rustix::fs::OFlags::NONBLOCK)
};
if new != previous {
rustix::fs::fcntl_setfl(fd, new)?;
}

return Ok(previous.contains(rustix::fs::OFlags::NONBLOCK));
}
}

#[cfg(all(test, unix, feature = "executor", feature = "futures-io"))]
mod tests {
use futures::io::{AsyncReadExt, AsyncWriteExt};

Expand Down
34 changes: 30 additions & 4 deletions src/loop_logic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::cell::{Cell, RefCell};
use std::fmt::Debug;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -10,6 +9,11 @@ use std::{io, slice};
#[cfg(feature = "block_on")]
use std::future::Future;

#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};

use log::trace;
use polling::Poller;

Expand Down Expand Up @@ -287,6 +291,7 @@ impl<'l, Data> LoopHandle<'l, Data> {
///
/// This loop can host several event sources, that can be dynamically added or removed.
pub struct EventLoop<'l, Data> {
#[allow(dead_code)]
poller: Arc<Poller>,
handle: LoopHandle<'l, Data>,
signals: Arc<Signals>,
Expand Down Expand Up @@ -664,6 +669,7 @@ impl<'l, Data> EventLoop<'l, Data> {
}
}

#[cfg(unix)]
impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
/// Get the underlying raw-fd of the poller.
///
Expand All @@ -677,6 +683,7 @@ impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
}
}

#[cfg(unix)]
impl<'l, Data> AsFd for EventLoop<'l, Data> {
/// Get the underlying fd of the poller.
///
Expand All @@ -689,6 +696,20 @@ impl<'l, Data> AsFd for EventLoop<'l, Data> {
}
}

#[cfg(windows)]
impl<Data> AsRawHandle for EventLoop<'_, Data> {
fn as_raw_handle(&self) -> RawHandle {
self.poller.as_raw_handle()
}
}

#[cfg(windows)]
impl<Data> AsHandle for EventLoop<'_, Data> {
fn as_handle(&self) -> BorrowedHandle<'_> {
self.poller.as_handle()
}
}

#[derive(Clone, Debug)]
/// The EventIterator is an `Iterator` over the events relevant to a particular source
/// This type is used in the [`EventSource::before_handle_events`] methods for
Expand Down Expand Up @@ -761,12 +782,14 @@ mod tests {

use crate::{
channel::{channel, Channel},
generic::Generic,
ping::*,
Dispatcher, EventIterator, EventSource, Interest, Mode, Poll, PostAction, Readiness,
RegistrationToken, Token, TokenFactory,
EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
TokenFactory,
};

#[cfg(unix)]
use crate::{generic::Generic, Dispatcher, Interest, Mode};

use super::EventLoop;

#[test]
Expand Down Expand Up @@ -1127,6 +1150,7 @@ mod tests {
}
}

#[cfg(unix)]
#[test]
fn insert_bad_source() {
use std::os::unix::io::FromRawFd;
Expand All @@ -1153,6 +1177,7 @@ mod tests {
assert!(ret.is_err());
}

#[cfg(unix)]
#[test]
fn insert_source_no_interest() {
use rustix::pipe::pipe;
Expand Down Expand Up @@ -1310,6 +1335,7 @@ mod tests {
assert_eq!(dispatched, 3);
}

#[cfg(unix)]
#[test]
fn change_interests() {
use rustix::io::write;
Expand Down
52 changes: 42 additions & 10 deletions src/sources/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
//! notification itself, and the monitored object is provided to your callback as the second
//! argument.
//!
//! ```
#![cfg_attr(unix, doc = "```")]
#![cfg_attr(not(unix), doc = "```no_run")]
//! # extern crate calloop;
//! use calloop::{generic::Generic, Interest, Mode, PostAction};
//!
//! # fn main() {
//! # let mut event_loop = calloop::EventLoop::<()>::try_new()
//! # .expect("Failed to initialize the event loop!");
//! # let handle = event_loop.handle();
//! # #[cfg(unix)]
//! # let io_object = std::io::stdin();
//! # #[cfg(windows)]
//! # let io_object: std::net::TcpStream = panic!();
//! handle.insert_source(
//! // wrap your IO object in a Generic, here we register for read readiness
//! // in level-triggering mode
Expand All @@ -38,12 +42,13 @@
//! [`EventSource`](crate::EventSource) implementation to them.

use polling::Poller;
use std::{
borrow,
marker::PhantomData,
ops,
os::unix::io::{AsFd, AsRawFd, BorrowedFd},
sync::Arc,
use std::{borrow, marker::PhantomData, ops, sync::Arc};

#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd};
#[cfg(windows)]
use std::os::windows::io::{
AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd,
};

use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
Expand Down Expand Up @@ -79,9 +84,15 @@ impl<T: AsRawFd> ops::DerefMut for FdWrapper<T> {
}

impl<T: AsRawFd> AsFd for FdWrapper<T> {
#[cfg(unix)]
fn as_fd(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
}

#[cfg(windows)]
fn as_socket(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) }
}
}

/// A wrapper around a type that doesn't expose it mutably safely.
Expand Down Expand Up @@ -134,10 +145,17 @@ impl<T> ops::Deref for NoIoDrop<T> {
}

impl<T: AsFd> AsFd for NoIoDrop<T> {
#[cfg(unix)]
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: The innter type is not mutated.
self.0.as_fd()
}

#[cfg(windows)]
fn as_socket(&self) -> BorrowedFd<'_> {
// SAFETY: The innter type is not mutated.
self.0.as_socket()
}
}

/// A generic event source wrapping a FD-backed type
Expand Down Expand Up @@ -199,7 +217,14 @@ impl<F: AsFd, E> Generic<F, E> {

// Remove it from the poller.
if let Some(poller) = self.poller.take() {
poller.delete(file.as_fd()).ok();
poller
.delete(
#[cfg(unix)]
file.as_fd(),
#[cfg(windows)]
file.as_socket(),
)
.ok();
}

file
Expand All @@ -226,7 +251,14 @@ impl<F: AsFd, E> Drop for Generic<F, E> {
fn drop(&mut self) {
// Remove it from the poller.
if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
poller.delete(file.as_fd()).ok();
poller
.delete(
#[cfg(unix)]
file.as_fd(),
#[cfg(windows)]
file.as_socket(),
)
.ok();
}
}
}
Expand Down Expand Up @@ -307,7 +339,7 @@ where
}
}

#[cfg(test)]
#[cfg(all(unix, test))]
mod tests {
use std::io::{Read, Write};

Expand Down
Loading

0 comments on commit 7789de0

Please sign in to comment.