From 6202d4c4132e493c0e6786b4326fed7f626616e0 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Fri, 12 Jan 2024 23:37:19 -0800 Subject: [PATCH 01/10] Add Windows support This commit adds Windows support to this package. Since polling already has Windows support through IOCP, the main obstacle was adding a ping event source using IOCP. The hardest part is emulating a pipe using some shared state and a posted completion packet. Fixes #160 Signed-off-by: John Nunley --- .github/workflows/ci.yml | 29 ++++ .vscode/settings.json | 3 +- Cargo.toml | 6 +- src/io.rs | 58 ++++++-- src/loop_logic.rs | 34 ++++- src/sources/generic.rs | 46 ++++-- src/sources/ping.rs | 9 +- src/sources/ping/iocp.rs | 298 +++++++++++++++++++++++++++++++++++++++ 8 files changed, 456 insertions(+), 27 deletions(-) create mode 100644 src/sources/ping/iocp.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 372a3a67..fa74468a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -150,6 +150,35 @@ jobs: command: test args: --all-features --manifest-path ./doc/Cargo.toml + ci-windows: + name: CI (Windows) + + needs: + - lint + + strategy: + fail-fast: false + matrix: + rust: ['1.63.0', 'stable'] + + runs-on: 'windows-latest' + + steps: + - name: Checkout sources + uses: actions/checkout@v4 + + - name: Rust toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.rust }} + override: true + + - name: Run tests + uses: actions-rs/cargo@v1 + with: + command: test + args: --features "block_on executor" + coverage: name: Coverage diff --git a/.vscode/settings.json b/.vscode/settings.json index 4d9636b5..4b03a4dd 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "rust-analyzer.showUnlinkedFileNotification": false + "rust-analyzer.showUnlinkedFileNotification": false, + "rust-analyzer.cargo.target": "x86_64-pc-windows-gnu" } \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 00f691b3..ef49da02 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,13 +23,15 @@ async-task = { version = "4.4.0", optional = true } bitflags = "2.4" futures-io = { version = "0.3.5", optional = true } log = "0.4" -nix = { version = "0.26", default-features = false, features = ["signal"], optional = true } pin-utils = { version = "0.1.0", optional = true } polling = "3.0.0" -rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] } slab = "0.4.8" +rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] } thiserror = "1.0.7" +[target.'cfg(unix)'.dependencies] +nix = { version = "0.26", default-features = false, features = ["signal"], optional = true } + [dev-dependencies] futures = "0.3.5" rustix = { version = "0.38", default-features = false, features = ["net"] } diff --git a/src/io.rs b/src/io.rs index 5b37e331..2a53eebe 100644 --- a/src/io.rs +++ b/src/io.rs @@ -7,12 +7,16 @@ //! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io use std::cell::RefCell; -use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll as TaskPoll, Waker}; -use rustix::fs::{fcntl_getfl, fcntl_setfl, OFlags}; +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{ + AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd, +}; #[cfg(feature = "futures-io")] use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; @@ -37,7 +41,6 @@ pub struct Async<'l, F: AsFd> { fd: Option, dispatcher: Rc>, inner: Rc, - old_flags: OFlags, } impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> { @@ -50,11 +53,19 @@ impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> { impl<'l, F: AsFd> Async<'l, F> { pub(crate) fn new(inner: Rc>, fd: F) -> crate::Result> { // set non-blocking - let old_flags = fcntl_getfl(&fd).map_err(std::io::Error::from)?; - fcntl_setfl(&fd, old_flags | OFlags::NONBLOCK).map_err(std::io::Error::from)?; + set_nonblocking( + #[cfg(unix)] + fd.as_fd(), + #[cfg(windows)] + fd.as_socket(), + true, + )?; // register in the loop let dispatcher = Rc::new(RefCell::new(IoDispatcher { + #[cfg(unix)] fd: fd.as_fd().as_raw_fd(), + #[cfg(windows)] + fd: fd.as_socket().as_raw_socket(), token: None, waker: None, is_registered: false, @@ -83,7 +94,6 @@ impl<'l, F: AsFd> Async<'l, F> { fd: Some(fd), dispatcher, inner, - old_flags, }) } @@ -165,9 +175,9 @@ impl<'l, F: AsFd> Drop for Async<'l, F> { fn drop(&mut self) { self.inner.kill(&self.dispatcher); // restore flags - let _ = fcntl_setfl( + let _ = set_nonblocking( unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) }, - self.old_flags, + false, ); } } @@ -358,7 +368,37 @@ impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> { } } -#[cfg(all(test, feature = "executor", feature = "futures-io"))] +// https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195 +#[inline] +fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<()> { + #[cfg(any(windows, target_os = "linux"))] + { + // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux + // for now, as with the standard library, because it seems to behave + // differently depending on the platform. + // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d + // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80 + // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a + rustix::io::ioctl_fionbio(fd, is_nonblocking)?; + } + + #[cfg(not(any(windows, target_os = "linux")))] + { + let previous = rustix::fs::fcntl_getfl(fd)?; + let new = if is_nonblocking { + previous | rustix::fs::OFlags::NONBLOCK + } else { + previous & !(rustix::fs::OFlags::NONBLOCK) + }; + if new != previous { + rustix::fs::fcntl_setfl(fd, new)?; + } + } + + Ok(()) +} + +#[cfg(all(test, unix, feature = "executor", feature = "futures-io"))] mod tests { use futures::io::{AsyncReadExt, AsyncWriteExt}; diff --git a/src/loop_logic.rs b/src/loop_logic.rs index 58bf4071..1b246814 100644 --- a/src/loop_logic.rs +++ b/src/loop_logic.rs @@ -1,6 +1,5 @@ use std::cell::{Cell, RefCell}; use std::fmt::Debug; -use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -10,6 +9,11 @@ use std::{io, slice}; #[cfg(feature = "block_on")] use std::future::Future; +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; +#[cfg(windows)] +use std::os::windows::io::{AsHandle, AsRawHandle, AsSocket as AsFd, BorrowedHandle, RawHandle}; + use log::trace; use polling::Poller; @@ -287,6 +291,7 @@ impl<'l, Data> LoopHandle<'l, Data> { /// /// This loop can host several event sources, that can be dynamically added or removed. pub struct EventLoop<'l, Data> { + #[allow(dead_code)] poller: Arc, handle: LoopHandle<'l, Data>, signals: Arc, @@ -664,6 +669,7 @@ impl<'l, Data> EventLoop<'l, Data> { } } +#[cfg(unix)] impl<'l, Data> AsRawFd for EventLoop<'l, Data> { /// Get the underlying raw-fd of the poller. /// @@ -677,6 +683,7 @@ impl<'l, Data> AsRawFd for EventLoop<'l, Data> { } } +#[cfg(unix)] impl<'l, Data> AsFd for EventLoop<'l, Data> { /// Get the underlying fd of the poller. /// @@ -689,6 +696,20 @@ impl<'l, Data> AsFd for EventLoop<'l, Data> { } } +#[cfg(windows)] +impl AsRawHandle for EventLoop<'_, Data> { + fn as_raw_handle(&self) -> RawHandle { + self.poller.as_raw_handle() + } +} + +#[cfg(windows)] +impl AsHandle for EventLoop<'_, Data> { + fn as_handle(&self) -> BorrowedHandle<'_> { + self.poller.as_handle() + } +} + #[derive(Clone, Debug)] /// The EventIterator is an `Iterator` over the events relevant to a particular source /// This type is used in the [`EventSource::before_handle_events`] methods for @@ -761,12 +782,14 @@ mod tests { use crate::{ channel::{channel, Channel}, - generic::Generic, ping::*, - Dispatcher, EventIterator, EventSource, Interest, Mode, Poll, PostAction, Readiness, - RegistrationToken, Token, TokenFactory, + EventIterator, EventSource, Poll, PostAction, Readiness, RegistrationToken, Token, + TokenFactory, }; + #[cfg(unix)] + use crate::{generic::Generic, Dispatcher, Interest, Mode}; + use super::EventLoop; #[test] @@ -1127,6 +1150,7 @@ mod tests { } } + #[cfg(unix)] #[test] fn insert_bad_source() { use std::os::unix::io::FromRawFd; @@ -1153,6 +1177,7 @@ mod tests { assert!(ret.is_err()); } + #[cfg(unix)] #[test] fn insert_source_no_interest() { use rustix::pipe::pipe; @@ -1310,6 +1335,7 @@ mod tests { assert_eq!(dispatched, 3); } + #[cfg(unix)] #[test] fn change_interests() { use rustix::io::write; diff --git a/src/sources/generic.rs b/src/sources/generic.rs index 63d94b2e..763d69da 100644 --- a/src/sources/generic.rs +++ b/src/sources/generic.rs @@ -38,12 +38,13 @@ //! [`EventSource`](crate::EventSource) implementation to them. use polling::Poller; -use std::{ - borrow, - marker::PhantomData, - ops, - os::unix::io::{AsFd, AsRawFd, BorrowedFd}, - sync::Arc, +use std::{borrow, marker::PhantomData, ops, sync::Arc}; + +#[cfg(unix)] +use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd}; +#[cfg(windows)] +use std::os::windows::io::{ + AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, }; use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory}; @@ -79,9 +80,15 @@ impl ops::DerefMut for FdWrapper { } impl AsFd for FdWrapper { + #[cfg(unix)] fn as_fd(&self) -> BorrowedFd { unsafe { BorrowedFd::borrow_raw(self.0.as_raw_fd()) } } + + #[cfg(windows)] + fn as_socket(&self) -> BorrowedFd { + unsafe { BorrowedFd::borrow_raw(self.0.as_raw_socket()) } + } } /// A wrapper around a type that doesn't expose it mutably safely. @@ -134,10 +141,17 @@ impl ops::Deref for NoIoDrop { } impl AsFd for NoIoDrop { + #[cfg(unix)] fn as_fd(&self) -> BorrowedFd<'_> { // SAFETY: The innter type is not mutated. self.0.as_fd() } + + #[cfg(windows)] + fn as_socket(&self) -> BorrowedFd<'_> { + // SAFETY: The innter type is not mutated. + self.0.as_socket() + } } /// A generic event source wrapping a FD-backed type @@ -199,7 +213,14 @@ impl Generic { // Remove it from the poller. if let Some(poller) = self.poller.take() { - poller.delete(file.as_fd()).ok(); + poller + .delete( + #[cfg(unix)] + file.as_fd(), + #[cfg(windows)] + file.as_socket(), + ) + .ok(); } file @@ -226,7 +247,14 @@ impl Drop for Generic { fn drop(&mut self) { // Remove it from the poller. if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) { - poller.delete(file.as_fd()).ok(); + poller + .delete( + #[cfg(unix)] + file.as_fd(), + #[cfg(windows)] + file.as_socket(), + ) + .ok(); } } } @@ -307,7 +335,7 @@ where } } -#[cfg(test)] +#[cfg(all(unix, test))] mod tests { use std::io::{Read, Write}; diff --git a/src/sources/ping.rs b/src/sources/ping.rs index b8683327..f986a83d 100644 --- a/src/sources/ping.rs +++ b/src/sources/ping.rs @@ -22,9 +22,14 @@ mod eventfd; #[cfg(target_os = "linux")] use eventfd as platform; -#[cfg(not(target_os = "linux"))] +#[cfg(windows)] +mod iocp; +#[cfg(windows)] +use iocp as platform; + +#[cfg(not(any(target_os = "linux", windows)))] mod pipe; -#[cfg(not(target_os = "linux"))] +#[cfg(not(any(target_os = "linux", windows)))] use pipe as platform; /// Create a new ping event source diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs new file mode 100644 index 00000000..e691be80 --- /dev/null +++ b/src/sources/ping/iocp.rs @@ -0,0 +1,298 @@ +//! IOCP-based implementation of the ping event source. +//! +//! The underlying `Poller` can be woken up at any time, using the `post` method +//! to send an arbitrary packet to the I/O completion port. The complication is +//! emulating a pipe. +//! +//! Since `Poller` is already wrapped in an `Arc`, we can clone it into some +//! synchronized inner state to send a pre-determined packet into it. Thankfully +//! calloop's use of the pipe is constrained enough that we can implement it using +//! a simple bool to keep track of whether or not it is notified. + +use crate::sources::EventSource; + +use polling::os::iocp::{CompletionPacket, PollerIocpExt}; +use polling::Poller; + +use std::fmt; +use std::io; +use std::sync::{Arc, Mutex, TryLockError}; + +#[inline] +pub fn make_ping() -> io::Result<(Ping, PingSource)> { + let state = Arc::new(State { + counter: Mutex::new(Counter { + notified: false, + poll_state: None, + }), + }); + + Ok(( + Ping { + state: state.clone(), + }, + PingSource { state }, + )) +} + +/// The event to trigger. +#[derive(Clone)] +pub struct Ping { + state: Arc, +} + +impl fmt::Debug for Ping { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_ping(&self.state, "Ping", f) + } +} + +/// The event source. +pub struct PingSource { + state: Arc, +} + +impl fmt::Debug for PingSource { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + debug_ping(&self.state, "PingSource", f) + } +} + +impl Ping { + /// Send a ping to the `PingSource`. + pub fn ping(&self) { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Indicate that we are now notified. + counter.notified = true; + + let poll_state = match &mut counter.poll_state { + Some(ps) => ps, + None => { + log::warn!("[calloop] ping was not registered with the event loop"); + return; + } + }; + + // If we aren't currently inserted in the loop, send our packet. + if let Err(e) = poll_state.notify() { + log::warn!("[calloop] failed to post packet to IOCP: {}", e); + } + } +} + +impl EventSource for PingSource { + type Error = super::PingError; + type Event = (); + type Metadata = (); + type Ret = (); + + fn process_events( + &mut self, + _readiness: crate::Readiness, + token: crate::Token, + mut callback: F, + ) -> Result + where + F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, + { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // If we aren't registered, break out. + let poll_state = match &mut counter.poll_state { + Some(ps) => ps, + None => { + return Err(super::PingError(Box::new(io::Error::from( + io::ErrorKind::NotFound, + )))); + } + }; + + // We are no longer inserted into the poller. + poll_state.inserted = false; + + // Make sure this is our token. + let token: usize = token.inner.into(); + if poll_state.packet.event().key != token { + log::warn!( + "[calloop] token does not match; expected {:x}, got {:x}", + poll_state.packet.event().key, + token + ); + } + + // Tell if we are registered. + if counter.notified { + counter.notified = false; + + // Call the callback. + callback((), &mut ()); + } + + // Stop looping if all of the Ping's have been dropped. + let action = if Arc::strong_count(&self.state) == 1 { + crate::PostAction::Remove + } else { + crate::PostAction::Continue + }; + + Ok(action) + } + + fn register( + &mut self, + poll: &mut crate::Poll, + token_factory: &mut crate::TokenFactory, + ) -> crate::Result<()> { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Make sure we haven't already been registered. + if counter.poll_state.is_none() { + return Err(io::Error::from(io::ErrorKind::AlreadyExists).into()); + } + + // Create the event to send. + let packet = { + let token = token_factory.token().inner.into(); + let event = polling::Event::readable(token); + CompletionPacket::new(event) + }; + + // Create the poll state. + let poll_state = PollState::new(poll.poller(), packet, counter.notified)?; + + // Substitute it into our poll state. + counter.poll_state = Some(poll_state); + Ok(()) + } + + fn reregister( + &mut self, + poll: &mut crate::Poll, + _token_factory: &mut crate::TokenFactory, + ) -> crate::Result<()> { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Make sure that the poller has been registered. + let poll_state = match &mut counter.poll_state { + Some(ps) => ps, + None => return Err(io::Error::from(io::ErrorKind::NotFound).into()), + }; + + // If it's a different poller, throw an error. + if !Arc::ptr_eq(&poll_state.poller, poll.poller()) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + "attempted to reregister() a PingSource with a different poller", + ) + .into()); + } + + // Nothing should really be changed here. + Ok(()) + } + + fn unregister(&mut self, _poll: &mut crate::Poll) -> crate::Result<()> { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + + // Remove our current registration. + counter.poll_state = None; + Ok(()) + } +} + +/// Inner state of the pipe. +struct State { + /// The counter used to keep track of our state. + counter: Mutex, +} + +/// Inner counter of the pipe. +struct Counter { + /// Are we notified? + notified: bool, + + /// The `Poller`-related state. + /// + /// This is `None` if we aren't inserted into the `Poller` yet. + poll_state: Option, +} + +/// The `Poller` combined with some associated state. +struct PollState { + /// The `Poller` that we are registered in. + poller: Arc, + + /// Are we inserted into the poller? + inserted: bool, + + /// The completion packet to send. + packet: CompletionPacket, +} + +impl PollState { + /// Create a new `PollState` based on the `Poller` and the `packet`. + /// + /// If `notified` is `true`, a packet is inserted into the poller. + fn new(poller: &Arc, packet: CompletionPacket, notified: bool) -> io::Result { + let mut poll_state = Self { + poller: poller.clone(), + packet, + inserted: false, + }; + + if notified { + poll_state.notify()?; + } + + Ok(poll_state) + } + + /// Notify the poller. + fn notify(&mut self) -> io::Result<()> { + if !self.inserted { + self.poller.post(self.packet.clone())?; + self.inserted = true; + } + + Ok(()) + } +} + +#[inline] +fn debug_ping(state: &State, name: &str, f: &mut fmt::Formatter) -> fmt::Result { + let counter = match state.counter.try_lock() { + Ok(counter) => counter, + Err(TryLockError::WouldBlock) => { + return f + .debug_tuple("Ping") + .field(&format_args!("")) + .finish() + } + Err(TryLockError::Poisoned(_)) => { + return f + .debug_tuple("Ping") + .field(&format_args!("")) + .finish() + } + }; + + let mut s = f.debug_struct(name); + s.field("notified", &counter.notified); + + // Tell if we are registered. + match &counter.poll_state { + Some(poll_state) => { + s.field("packet", poll_state.packet.event()); + s.field("inserted", &poll_state.inserted); + } + + None => { + s.field("packet", &format_args!("")); + } + } + + s.finish() +} From aedcf454bb8b3cb5f6f30b7a9fd0c786f8bc16fb Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 13 Jan 2024 00:12:59 -0800 Subject: [PATCH 02/10] Fix minor snafu Signed-off-by: John Nunley --- .vscode/settings.json | 3 +-- src/sources/ping/iocp.rs | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 4b03a4dd..4d9636b5 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,4 +1,3 @@ { - "rust-analyzer.showUnlinkedFileNotification": false, - "rust-analyzer.cargo.target": "x86_64-pc-windows-gnu" + "rust-analyzer.showUnlinkedFileNotification": false } \ No newline at end of file diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs index e691be80..c7aa0bf5 100644 --- a/src/sources/ping/iocp.rs +++ b/src/sources/ping/iocp.rs @@ -149,7 +149,7 @@ impl EventSource for PingSource { let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); // Make sure we haven't already been registered. - if counter.poll_state.is_none() { + if counter.poll_state.is_some() { return Err(io::Error::from(io::ErrorKind::AlreadyExists).into()); } From 26cbf3a04b0a86e977a23979ccf631d9bf58d9ec Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 13 Jan 2024 00:20:33 -0800 Subject: [PATCH 03/10] Fix more test failures Signed-off-by: John Nunley --- src/sources/ping/iocp.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs index c7aa0bf5..42472480 100644 --- a/src/sources/ping/iocp.rs +++ b/src/sources/ping/iocp.rs @@ -104,9 +104,8 @@ impl EventSource for PingSource { let poll_state = match &mut counter.poll_state { Some(ps) => ps, None => { - return Err(super::PingError(Box::new(io::Error::from( - io::ErrorKind::NotFound, - )))); + // We were deregistered; remove ourselves from the list. + return Ok(crate::PostAction::Remove); } }; @@ -121,6 +120,7 @@ impl EventSource for PingSource { poll_state.packet.event().key, token ); + return Ok(PostAction::Continue); } // Tell if we are registered. @@ -132,7 +132,7 @@ impl EventSource for PingSource { } // Stop looping if all of the Ping's have been dropped. - let action = if Arc::strong_count(&self.state) == 1 { + let action = if Arc::strong_count(&self.state) <= 1 { crate::PostAction::Remove } else { crate::PostAction::Continue From 2311a76975adac42f6b6382c085bc81a9a6100ed Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 13 Jan 2024 00:21:16 -0800 Subject: [PATCH 04/10] fmt Signed-off-by: John Nunley --- src/sources/ping/iocp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs index 42472480..79b88b5f 100644 --- a/src/sources/ping/iocp.rs +++ b/src/sources/ping/iocp.rs @@ -105,7 +105,7 @@ impl EventSource for PingSource { Some(ps) => ps, None => { // We were deregistered; remove ourselves from the list. - return Ok(crate::PostAction::Remove); + return Ok(crate::PostAction::Remove); } }; From 25c9ad093668f58068907961dc60910767c033ea Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 13 Jan 2024 00:24:31 -0800 Subject: [PATCH 05/10] Fix minor error Signed-off-by: John Nunley --- src/sources/ping/iocp.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs index 79b88b5f..4d20ec38 100644 --- a/src/sources/ping/iocp.rs +++ b/src/sources/ping/iocp.rs @@ -120,7 +120,7 @@ impl EventSource for PingSource { poll_state.packet.event().key, token ); - return Ok(PostAction::Continue); + return Ok(crate::PostAction::Continue); } // Tell if we are registered. From 52bf198c0797129a23695d3bf9e3d07ee2f4adfb Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 13 Jan 2024 00:34:57 -0800 Subject: [PATCH 06/10] Add ability to change the inner token Signed-off-by: John Nunley --- src/sources/ping/iocp.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs index 4d20ec38..bf3f8ec2 100644 --- a/src/sources/ping/iocp.rs +++ b/src/sources/ping/iocp.rs @@ -171,7 +171,7 @@ impl EventSource for PingSource { fn reregister( &mut self, poll: &mut crate::Poll, - _token_factory: &mut crate::TokenFactory, + token_factory: &mut crate::TokenFactory, ) -> crate::Result<()> { let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); @@ -190,7 +190,18 @@ impl EventSource for PingSource { .into()); } - // Nothing should really be changed here. + // Change the token if needed. + let packet = { + let token = token_factory.token().inner.into(); + let event = polling::Event::readable(token); + CompletionPacket::new(event) + }; + poll_state.packet = packet; + if poll_state.inserted { + poll_state.inserted = false; + poll_state.notify()?; + } + Ok(()) } From eefe5e6804ddba914546901615d38a746b988ecc Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 13 Jan 2024 11:34:50 -0800 Subject: [PATCH 07/10] Iron out bugs noticed by the tests Signed-off-by: John Nunley --- src/sources/ping/iocp.rs | 45 ++++++++++++++++++++++++++++------------ 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/src/sources/ping/iocp.rs b/src/sources/ping/iocp.rs index bf3f8ec2..16fefa88 100644 --- a/src/sources/ping/iocp.rs +++ b/src/sources/ping/iocp.rs @@ -83,6 +83,20 @@ impl Ping { } } +impl Drop for Ping { + fn drop(&mut self) { + // If this is the last ping, wake up the source so it removes itself. + if Arc::strong_count(&self.state) <= 2 { + let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); + if let Some(poll_state) = &mut counter.poll_state { + if let Err(e) = poll_state.notify() { + log::warn!("[calloop] failed to post packet to IOCP during drop: {}", e); + } + } + } + } +} + impl EventSource for PingSource { type Error = super::PingError; type Event = (); @@ -104,8 +118,8 @@ impl EventSource for PingSource { let poll_state = match &mut counter.poll_state { Some(ps) => ps, None => { - // We were deregistered; remove ourselves from the list. - return Ok(crate::PostAction::Remove); + // We were deregistered; indicate to the higher level loop. + return Ok(crate::PostAction::Disable); } }; @@ -146,6 +160,7 @@ impl EventSource for PingSource { poll: &mut crate::Poll, token_factory: &mut crate::TokenFactory, ) -> crate::Result<()> { + let token = token_factory.token(); let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); // Make sure we haven't already been registered. @@ -155,7 +170,7 @@ impl EventSource for PingSource { // Create the event to send. let packet = { - let token = token_factory.token().inner.into(); + let token = token.inner.into(); let event = polling::Event::readable(token); CompletionPacket::new(event) }; @@ -173,6 +188,7 @@ impl EventSource for PingSource { poll: &mut crate::Poll, token_factory: &mut crate::TokenFactory, ) -> crate::Result<()> { + let token = token_factory.token(); let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); // Make sure that the poller has been registered. @@ -191,15 +207,16 @@ impl EventSource for PingSource { } // Change the token if needed. - let packet = { - let token = token_factory.token().inner.into(); - let event = polling::Event::readable(token); - CompletionPacket::new(event) - }; - poll_state.packet = packet; - if poll_state.inserted { - poll_state.inserted = false; - poll_state.notify()?; + let token = token.inner.into(); + let event = polling::Event::readable(token); + + if event.key != poll_state.packet.event().key { + poll_state.packet = CompletionPacket::new(event); + + if poll_state.inserted { + poll_state.inserted = false; + poll_state.notify()?; + } } Ok(()) @@ -209,7 +226,9 @@ impl EventSource for PingSource { let mut counter = self.state.counter.lock().unwrap_or_else(|e| e.into_inner()); // Remove our current registration. - counter.poll_state = None; + if counter.poll_state.take().is_none() { + log::trace!("[calloop] unregistered a source that wasn't registered"); + } Ok(()) } } From 1c53dc9839c19883c2212a049bed5cfea47d01db Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 13 Jan 2024 11:46:44 -0800 Subject: [PATCH 08/10] Fix doctest Signed-off-by: John Nunley --- src/sources/generic.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/sources/generic.rs b/src/sources/generic.rs index 763d69da..4a76051f 100644 --- a/src/sources/generic.rs +++ b/src/sources/generic.rs @@ -7,7 +7,8 @@ //! notification itself, and the monitored object is provided to your callback as the second //! argument. //! -//! ``` +#![cfg_attr(unix, doc = "```")] +#![cfg_attr(not(unix), doc = "```no_run")] //! # extern crate calloop; //! use calloop::{generic::Generic, Interest, Mode, PostAction}; //! @@ -15,7 +16,10 @@ //! # let mut event_loop = calloop::EventLoop::<()>::try_new() //! # .expect("Failed to initialize the event loop!"); //! # let handle = event_loop.handle(); +//! # #[cfg(unix)] //! # let io_object = std::io::stdin(); +//! # #[cfg(windows)] +//! # let io_object: std::net::TcpStream = panic!(); //! handle.insert_source( //! // wrap your IO object in a Generic, here we register for read readiness //! // in level-triggering mode From b64ea38315d8fa284463be1533ce031ed1543348 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 14 Jan 2024 09:16:15 -0800 Subject: [PATCH 09/10] Document Windows platform limitations Signed-off-by: John Nunley --- src/io.rs | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/io.rs b/src/io.rs index 2a53eebe..a83eaefd 100644 --- a/src/io.rs +++ b/src/io.rs @@ -37,10 +37,18 @@ use crate::{AdditionalLifecycleEventsSet, RegistrationToken}; /// `AsyncWrite` if the underlying type implements `Read` and/or `Write`. /// /// Note that this adapter and the futures procuded from it and *not* threadsafe. +/// +/// ## Platform-Specific +/// +/// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status. +/// For example, if the file was previously nonblocking it will be set to nonblocking, and +/// if the file was blocking it will be set to blocking. However, on Windows, it is impossible +/// to tell what its status was before. Therefore it will always be set to blocking. pub struct Async<'l, F: AsFd> { fd: Option, dispatcher: Rc>, inner: Rc, + was_nonblocking: bool, } impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> { @@ -53,7 +61,7 @@ impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> { impl<'l, F: AsFd> Async<'l, F> { pub(crate) fn new(inner: Rc>, fd: F) -> crate::Result> { // set non-blocking - set_nonblocking( + let was_nonblocking = set_nonblocking( #[cfg(unix)] fd.as_fd(), #[cfg(windows)] @@ -94,6 +102,7 @@ impl<'l, F: AsFd> Async<'l, F> { fd: Some(fd), dispatcher, inner, + was_nonblocking }) } @@ -177,7 +186,7 @@ impl<'l, F: AsFd> Drop for Async<'l, F> { // restore flags let _ = set_nonblocking( unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) }, - false, + self.was_nonblocking, ); } } @@ -369,20 +378,20 @@ impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> { } // https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195 +/// Set the nonblocking status of an FD and return whether it was nonblocking before. +#[allow(clippy::needless_return)] #[inline] -fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<()> { - #[cfg(any(windows, target_os = "linux"))] +fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result { + #[cfg(windows)] { - // ioctl(FIONBIO) sets the flag atomically, but we use this only on Linux - // for now, as with the standard library, because it seems to behave - // differently depending on the platform. - // https://github.com/rust-lang/rust/commit/efeb42be2837842d1beb47b51bb693c7474aba3d - // https://github.com/libuv/libuv/blob/e9d91fccfc3e5ff772d5da90e1c4a24061198ca0/src/unix/poll.c#L78-L80 - // https://github.com/tokio-rs/mio/commit/0db49f6d5caf54b12176821363d154384357e70a rustix::io::ioctl_fionbio(fd, is_nonblocking)?; + + // Unfortunately it is impossible to tell if a socket was nonblocking on Windows. + // Just say it wasn't for now. + return Ok(false); } - #[cfg(not(any(windows, target_os = "linux")))] + #[cfg(not(windows))] { let previous = rustix::fs::fcntl_getfl(fd)?; let new = if is_nonblocking { @@ -393,9 +402,9 @@ fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result< if new != previous { rustix::fs::fcntl_setfl(fd, new)?; } - } - Ok(()) + return Ok(previous.contains(rustix::fs::OFlags::NONBLOCK)); + } } #[cfg(all(test, unix, feature = "executor", feature = "futures-io"))] From f756fd9d4b3f8786546603c9866fa8e6c83428bb Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 14 Jan 2024 09:50:05 -0800 Subject: [PATCH 10/10] fmt Signed-off-by: John Nunley --- src/io.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/io.rs b/src/io.rs index a83eaefd..24f76410 100644 --- a/src/io.rs +++ b/src/io.rs @@ -37,9 +37,9 @@ use crate::{AdditionalLifecycleEventsSet, RegistrationToken}; /// `AsyncWrite` if the underlying type implements `Read` and/or `Write`. /// /// Note that this adapter and the futures procuded from it and *not* threadsafe. -/// +/// /// ## Platform-Specific -/// +/// /// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status. /// For example, if the file was previously nonblocking it will be set to nonblocking, and /// if the file was blocking it will be set to blocking. However, on Windows, it is impossible @@ -102,7 +102,7 @@ impl<'l, F: AsFd> Async<'l, F> { fd: Some(fd), dispatcher, inner, - was_nonblocking + was_nonblocking, }) }