Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Windows support #168

Merged
merged 10 commits into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,35 @@ jobs:
command: test
args: --all-features --manifest-path ./doc/Cargo.toml

ci-windows:
name: CI (Windows)

needs:
- lint

strategy:
fail-fast: false
matrix:
rust: ['1.63.0', 'stable']

runs-on: 'windows-latest'

steps:
- name: Checkout sources
uses: actions/checkout@v4

- name: Rust toolchain
uses: actions-rs/toolchain@v1
with:
toolchain: ${{ matrix.rust }}
override: true

- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: --features "block_on executor"

coverage:
name: Coverage

Expand Down
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ async-task = { version = "4.4.0", optional = true }
bitflags = "2.4"
futures-io = { version = "0.3.5", optional = true }
log = "0.4"
nix = { version = "0.26", default-features = false, features = ["signal"], optional = true }
pin-utils = { version = "0.1.0", optional = true }
polling = "3.0.0"
rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] }
slab = "0.4.8"
rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] }
thiserror = "1.0.7"

[target.'cfg(unix)'.dependencies]
nix = { version = "0.26", default-features = false, features = ["signal"], optional = true }

[dev-dependencies]
futures = "0.3.5"
rustix = { version = "0.38", default-features = false, features = ["net"] }
Expand Down
67 changes: 58 additions & 9 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
//! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io

use std::cell::RefCell;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll as TaskPoll, Waker};

use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags};
#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{
AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd,
};

#[cfg(feature = "futures-io")]
use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut};
Expand All @@ -33,11 +37,18 @@
/// `AsyncWrite` if the underlying type implements `Read` and/or `Write`.
///
/// Note that this adapter and the futures procuded from it and *not* threadsafe.
///
/// ## Platform-Specific
///
/// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status.
/// For example, if the file was previously nonblocking it will be set to nonblocking, and
/// if the file was blocking it will be set to blocking. However, on Windows, it is impossible
/// to tell what its status was before. Therefore it will always be set to blocking.
pub struct Async<'l, F: AsFd> {
fd: Option<F>,
dispatcher: Rc<RefCell<IoDispatcher>>,
inner: Rc<dyn IoLoopInner + 'l>,
old_flags: OFlags,
was_nonblocking: bool,
}

impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> {
Expand All @@ -50,11 +61,19 @@
impl<'l, F: AsFd> Async<'l, F> {
pub(crate) fn new<Data>(inner: Rc<LoopInner<'l, Data>>, fd: F) -> crate::Result<Async<'l, F>> {
// set non-blocking
let old_flags = fcntl_getfl(&fd).map_err(std::io::Error::from)?;
fcntl_setfl(&fd, old_flags | OFlags::NONBLOCK).map_err(std::io::Error::from)?;
let was_nonblocking = set_nonblocking(
#[cfg(unix)]
fd.as_fd(),
#[cfg(windows)]
fd.as_socket(),
true,
)?;
// register in the loop
let dispatcher = Rc::new(RefCell::new(IoDispatcher {
#[cfg(unix)]
fd: fd.as_fd().as_raw_fd(),
#[cfg(windows)]
fd: fd.as_socket().as_raw_socket(),
token: None,
waker: None,
is_registered: false,
Expand Down Expand Up @@ -83,7 +102,7 @@
fd: Some(fd),
dispatcher,
inner,
old_flags,
was_nonblocking,
})
}

Expand Down Expand Up @@ -165,9 +184,9 @@
fn drop(&mut self) {
self.inner.kill(&self.dispatcher);
// restore flags
let _ = fcntl_setfl(
let _ = set_nonblocking(
unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) },
self.old_flags,
self.was_nonblocking,
);
}
}
Expand All @@ -184,7 +203,7 @@
unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
let disp = dispatcher.borrow();
self.poll.borrow_mut().register(
unsafe { BorrowedFd::borrow_raw(disp.fd) },

Check warning on line 206 in src/io.rs

View workflow job for this annotation

GitHub Actions / CI (1.63.0)

unnecessary `unsafe` block

Check warning on line 206 in src/io.rs

View workflow job for this annotation

GitHub Actions / CI (1.63.0)

unnecessary `unsafe` block

Check warning on line 206 in src/io.rs

View workflow job for this annotation

GitHub Actions / CI (Windows) (1.63.0)

unnecessary `unsafe` block
Interest::EMPTY,
Mode::OneShot,
disp.token.expect("No token for IO dispatcher"),
Expand Down Expand Up @@ -358,7 +377,37 @@
}
}

#[cfg(all(test, feature = "executor", feature = "futures-io"))]
// https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195
/// Set the nonblocking status of an FD and return whether it was nonblocking before.
#[allow(clippy::needless_return)]
#[inline]
fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<bool> {
#[cfg(windows)]
{
rustix::io::ioctl_fionbio(fd, is_nonblocking)?;

// Unfortunately it is impossible to tell if a socket was nonblocking on Windows.
// Just say it wasn't for now.
return Ok(false);
}

#[cfg(not(windows))]
{
let previous = rustix::fs::fcntl_getfl(fd)?;
let new = if is_nonblocking {
previous | rustix::fs::OFlags::NONBLOCK
} else {
previous & !(rustix::fs::OFlags::NONBLOCK)
};
if new != previous {
rustix::fs::fcntl_setfl(fd, new)?;
}

return Ok(previous.contains(rustix::fs::OFlags::NONBLOCK));
}
}

#[cfg(all(test, unix, feature = "executor", feature = "futures-io"))]
mod tests {
use futures::io::{AsyncReadExt, AsyncWriteExt};

Expand Down
34 changes: 30 additions & 4 deletions src/loop_logic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::cell::{Cell, RefCell};
use std::fmt::Debug;
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -10,6 +9,11 @@ use std::{io, slice};
#[cfg(feature = "block_on")]
use std::future::Future;

#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
#[cfg(windows)]
use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle};

use log::trace;
use polling::Poller;

Expand Down Expand Up @@ -287,6 +291,7 @@ impl<'l, Data> LoopHandle<'l, Data> {
///
/// This loop can host several event sources, that can be dynamically added or removed.
pub struct EventLoop<'l, Data> {
#[allow(dead_code)]
poller: Arc<Poller>,
handle: LoopHandle<'l, Data>,
signals: Arc<Signals>,
Expand Down Expand Up @@ -664,6 +669,7 @@ impl<'l, Data> EventLoop<'l, Data> {
}
}

#[cfg(unix)]
impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
/// Get the underlying raw-fd of the poller.
///
Expand All @@ -677,6 +683,7 @@ impl<'l, Data> AsRawFd for EventLoop<'l, Data> {
}
}

#[cfg(unix)]
impl<'l, Data> AsFd for EventLoop<'l, Data> {
/// Get the underlying fd of the poller.
///
Expand All @@ -689,6 +696,20 @@ impl<'l, Data> AsFd for EventLoop<'l, Data> {
}
}

#[cfg(windows)]
impl<Data> AsRawHandle for EventLoop<'_, Data> {
fn as_raw_handle(&self) -> RawHandle {
self.poller.as_raw_handle()
}
}

#[cfg(windows)]
impl<Data> AsHandle for EventLoop<'_, Data> {
fn as_handle(&self) -> BorrowedHandle<'_> {
self.poller.as_handle()
}
}

#[derive(Clone, Debug)]
/// The EventIterator is an `Iterator` over the events relevant to a particular source
/// This type is used in the [`EventSource::before_handle_events`] methods for
Expand Down Expand Up @@ -761,12 +782,14 @@ mod tests {

use crate::{
channel::{channel, Channel},
generic::Generic,
ping::*,
Dispatcher, EventIterator, EventSource, Interest, Mode, Poll, PostAction, Readiness,
RegistrationToken, Token, TokenFactory,
EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token,
TokenFactory,
};

#[cfg(unix)]
use crate::{generic::Generic, Dispatcher, Interest, Mode};

use super::EventLoop;

#[test]
Expand Down Expand Up @@ -1127,6 +1150,7 @@ mod tests {
}
}

#[cfg(unix)]
#[test]
fn insert_bad_source() {
use std::os::unix::io::FromRawFd;
Expand All @@ -1153,6 +1177,7 @@ mod tests {
assert!(ret.is_err());
}

#[cfg(unix)]
#[test]
fn insert_source_no_interest() {
use rustix::pipe::pipe;
Expand Down Expand Up @@ -1310,6 +1335,7 @@ mod tests {
assert_eq!(dispatched, 3);
}

#[cfg(unix)]
#[test]
fn change_interests() {
use rustix::io::write;
Expand Down
52 changes: 42 additions & 10 deletions src/sources/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,19 @@
//! notification itself, and the monitored object is provided to your callback as the second
//! argument.
//!
//! ```
#![cfg_attr(unix, doc = "```")]
#![cfg_attr(not(unix), doc = "```no_run")]
//! # extern crate calloop;
//! use calloop::{generic::Generic, Interest, Mode, PostAction};
//!
//! # fn main() {
//! # let mut event_loop = calloop::EventLoop::<()>::try_new()
//! # .expect("Failed to initialize the event loop!");
//! # let handle = event_loop.handle();
//! # #[cfg(unix)]
//! # let io_object = std::io::stdin();
//! # #[cfg(windows)]
//! # let io_object: std::net::TcpStream = panic!();
//! handle.insert_source(
//! // wrap your IO object in a Generic, here we register for read readiness
//! // in level-triggering mode
Expand All @@ -38,12 +42,13 @@
//! [`EventSource`](crate::EventSource) implementation to them.

use polling::Poller;
use std::{
borrow,
marker::PhantomData,
ops,
os::unix::io::{AsFd, AsRawFd, BorrowedFd},
sync::Arc,
use std::{borrow, marker::PhantomData, ops, sync::Arc};

#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd};
#[cfg(windows)]
use std::os::windows::io::{
AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd,
};

use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};
Expand Down Expand Up @@ -79,9 +84,15 @@
}

impl<T: AsRawFd> AsFd for FdWrapper<T> {
#[cfg(unix)]
fn as_fd(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) }
}

#[cfg(windows)]
fn as_socket(&self) -> BorrowedFd {
unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) }
}
}

/// A wrapper around a type that doesn't expose it mutably safely.
Expand Down Expand Up @@ -134,10 +145,17 @@
}

impl<T: AsFd> AsFd for NoIoDrop<T> {
#[cfg(unix)]
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: The innter type is not mutated.
self.0.as_fd()
}

#[cfg(windows)]
fn as_socket(&self) -> BorrowedFd<'_> {
// SAFETY: The innter type is not mutated.
self.0.as_socket()
}
}

/// A generic event source wrapping a FD-backed type
Expand Down Expand Up @@ -199,7 +217,14 @@

// Remove it from the poller.
if let Some(poller) = self.poller.take() {
poller.delete(file.as_fd()).ok();
poller
.delete(
#[cfg(unix)]
file.as_fd(),
#[cfg(windows)]
file.as_socket(),
)
.ok();

Check warning on line 227 in src/sources/generic.rs

View check run for this annotation

Codecov / codecov/patch

src/sources/generic.rs#L220-L227

Added lines #L220 - L227 were not covered by tests
}

file
Expand All @@ -226,7 +251,14 @@
fn drop(&mut self) {
// Remove it from the poller.
if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
poller.delete(file.as_fd()).ok();
poller
.delete(
#[cfg(unix)]
file.as_fd(),
#[cfg(windows)]
file.as_socket(),
)
.ok();
}
}
}
Expand Down Expand Up @@ -307,7 +339,7 @@
}
}

#[cfg(test)]
#[cfg(all(unix, test))]
mod tests {
use std::io::{Read, Write};

Expand Down
Loading
Loading