Skip to content

Commit

Permalink
Update for new polling breaking changes (#141)
Browse files Browse the repository at this point in the history
* Update for new polling breaking changes

* Update for polling v3.0.0

Signed-off-by: John Nunley <[email protected]>

* Silence clippy

Signed-off-by: John Nunley <[email protected]>

* Fix book compilation

Signed-off-by: John Nunley <[email protected]>

* Add more documentation to the NoIoDrop type

Signed-off-by: John Nunley <[email protected]>

---------

Signed-off-by: John Nunley <[email protected]>
  • Loading branch information
notgull authored Sep 10, 2023
1 parent 6cc56c1 commit 9999520
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 53 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ io-lifetimes = "1.0.3"
log = "0.4"
nix = { version = "0.26", default-features = false, features = ["signal"], optional = true }
pin-utils = { version = "0.1.0", optional = true }
polling = "2.6.0"
polling = "3.0.0"
rustix = { version = "0.38", default-features = false, features = ["event", "fs", "pipe", "std"] }
slab = "0.4.8"
thiserror = "1.0"
Expand Down
16 changes: 8 additions & 8 deletions doc/src/zmqsource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ where
// on the socket that warrants reading the events again.
let events = self
.socket
.file
.get_ref()
.get_events()
.context("Failed to read ZeroMQ events")?;

Expand All @@ -170,7 +170,7 @@ where
if events.contains(zmq::POLLOUT) {
if let Some(parts) = self.outbox.pop_front() {
self.socket
.file
.get_ref()
.send_multipart(parts, 0)
.context("Failed to send message")?;
used_socket = true;
Expand All @@ -182,7 +182,7 @@ where
// sending, which includes all parts of a multipart message.
let messages = self
.socket
.file
.get_ref()
.recv_multipart(0)
.context("Failed to receive message")?;
used_socket = true;
Expand Down Expand Up @@ -247,14 +247,14 @@ where
//
// - https://stackoverflow.com/a/38338578/188535
// - http://api.zeromq.org/4-0:zmq-ctx-term
self.socket.file.set_linger(0).ok();
self.socket.file.set_rcvtimeo(0).ok();
self.socket.file.set_sndtimeo(0).ok();
self.socket.get_ref().set_linger(0).ok();
self.socket.get_ref().set_rcvtimeo(0).ok();
self.socket.get_ref().set_sndtimeo(0).ok();

// Double result because (a) possible failure on call and (b) possible
// failure decoding.
if let Ok(Ok(last_endpoint)) = self.socket.file.get_last_endpoint() {
self.socket.file.disconnect(&last_endpoint).ok();
if let Ok(Ok(last_endpoint)) = self.socket.get_ref().get_last_endpoint() {
self.socket.get_ref().disconnect(&last_endpoint).ok();
}
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ impl<'l, F: AsFd> Async<'l, F> {
}));
let key = inner.sources.borrow_mut().insert(dispatcher.clone());
dispatcher.borrow_mut().token = Some(Token { key });
inner.register(&dispatcher)?;

// SAFETY: We are sure to deregister on drop.
unsafe {
inner.register(&dispatcher)?;
}

// Straightforward casting would require us to add the bound `Data: 'l` but we don't actually need it
// as this module never accesses the dispatch data, so we use transmute to erase it
Expand Down Expand Up @@ -168,13 +172,13 @@ impl<'l, F: AsFd> Drop for Async<'l, F> {
impl<'l, F: AsFd> Unpin for Async<'l, F> {}

trait IoLoopInner {
fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
fn kill(&self, dispatcher: &RefCell<IoDispatcher>);
}

impl<'l, Data> IoLoopInner for LoopInner<'l, Data> {
fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
let disp = dispatcher.borrow();
self.poll.borrow_mut().register(
unsafe { BorrowedFd::borrow_raw(disp.fd) },

Check warning on line 184 in src/io.rs

View workflow job for this annotation

GitHub Actions / CI (1.63.0)

unnecessary `unsafe` block

Check warning on line 184 in src/io.rs

View workflow job for this annotation

GitHub Actions / CI (1.63.0)

unnecessary `unsafe` block
Expand Down
143 changes: 129 additions & 14 deletions src/sources/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
//! [`EventSource`](crate::EventSource) implementation to them.

use io_lifetimes::{AsFd, BorrowedFd};
use std::{marker::PhantomData, ops, os::unix::io::AsRawFd};
use polling::Poller;
use std::{borrow, marker::PhantomData, ops, os::unix::io::AsRawFd, sync::Arc};

use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};

Expand Down Expand Up @@ -78,16 +79,79 @@ impl<T: AsRawFd> AsFd for FdWrapper<T> {
}
}

/// A wrapper around a type that doesn't expose it mutably safely.
///
/// The [`EventSource`] trait's `Metadata` type demands mutable access to the inner I/O source.
/// However, the inner polling source used by `calloop` keeps the handle-based equivalent of an
/// immutable pointer to the underlying object's I/O handle. Therefore, if the inner source is
/// dropped, this leaves behind a dangling pointer which immediately invokes undefined behavior
/// on the next poll of the event loop.
///
/// In order to prevent this from happening, the [`Generic`] I/O source must not directly expose
/// a mutable reference to the underlying handle. This type wraps around the underlying handle and
/// easily allows users to take immutable (`&`) references to the type, but makes mutable (`&mut`)
/// references unsafe to get. Therefore, it prevents the source from being moved out and dropped
/// while it is still registered in the event loop.
///
/// [`EventSource`]: crate::EventSource
#[derive(Debug)]
pub struct NoIoDrop<T>(T);

impl<T> NoIoDrop<T> {
/// Get a mutable reference.
///
/// # Safety
///
/// The inner type's I/O source must not be dropped.
pub unsafe fn get_mut(&mut self) -> &mut T {
&mut self.0
}
}

impl<T> AsRef<T> for NoIoDrop<T> {
fn as_ref(&self) -> &T {
&self.0
}
}

impl<T> borrow::Borrow<T> for NoIoDrop<T> {
fn borrow(&self) -> &T {
&self.0
}
}

impl<T> ops::Deref for NoIoDrop<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl<T: AsFd> AsFd for NoIoDrop<T> {
fn as_fd(&self) -> BorrowedFd<'_> {
// SAFETY: The innter type is not mutated.
self.0.as_fd()
}
}

/// A generic event source wrapping a FD-backed type
#[derive(Debug)]
pub struct Generic<F: AsFd, E = std::io::Error> {
/// The wrapped FD-backed type
pub file: F,
/// The wrapped FD-backed type.
///
/// This must be deregistered before it is dropped.
file: Option<NoIoDrop<F>>,
/// The programmed interest
pub interest: Interest,
/// The programmed mode
pub mode: Mode,

/// Back-reference to the poller.
///
/// This is needed to drop the original file.
poller: Option<Arc<Poller>>,

// This token is used by the event loop logic to look up this source when an
// event occurs.
token: Option<Token>,
Expand All @@ -101,30 +165,64 @@ impl<F: AsFd> Generic<F, std::io::Error> {
/// [`std::io::Error`] as its error type.
pub fn new(file: F, interest: Interest, mode: Mode) -> Generic<F, std::io::Error> {
Generic {
file,
file: Some(NoIoDrop(file)),
interest,
mode,
token: None,
poller: None,
_error_type: PhantomData,
}
}

/// Wrap a FD-backed type into a `Generic` event source using an arbitrary error type.
pub fn new_with_error<E>(file: F, interest: Interest, mode: Mode) -> Generic<F, E> {
Generic {
file,
file: Some(NoIoDrop(file)),
interest,
mode,
token: None,
poller: None,
_error_type: PhantomData,
}
}
}

impl<F: AsFd, E> Generic<F, E> {
/// Unwrap the `Generic` source to retrieve the underlying type
pub fn unwrap(self) -> F {
self.file
pub fn unwrap(mut self) -> F {
let NoIoDrop(file) = self.file.take().unwrap();

// Remove it from the poller.
if let Some(poller) = self.poller.take() {
poller.delete(file.as_fd()).ok();
}

file
}

/// Get a reference to the underlying type.
pub fn get_ref(&self) -> &F {
&self.file.as_ref().unwrap().0
}

/// Get a mutable reference to the underlying type.
///
/// # Safety
///
/// This is unsafe because it allows you to modify the underlying type, which
/// allows you to drop the underlying event source. Dropping the underlying source
/// leads to a dangling reference.
pub unsafe fn get_mut(&mut self) -> &mut F {
self.file.as_mut().unwrap().get_mut()
}
}

impl<F: AsFd, E> Drop for Generic<F, E> {
fn drop(&mut self) {
// Remove it from the poller.
if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
poller.delete(file.as_fd()).ok();
}
}
}

Expand All @@ -134,7 +232,7 @@ where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Event = Readiness;
type Metadata = F;
type Metadata = NoIoDrop<F>;
type Ret = Result<PostAction, E>;
type Error = E;

Expand All @@ -152,13 +250,24 @@ where
return Ok(PostAction::Continue);
}

callback(readiness, &mut self.file)
callback(readiness, self.file.as_mut().unwrap())
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
let token = token_factory.token();

poll.register(&self.file, self.interest, self.mode, token)?;
// Make sure we can use the poller to deregister if need be.
self.poller = Some(poll.poller().clone());

// SAFETY: We've now ensured that we have a poller to deregister with.
unsafe {
poll.register(
&self.file.as_ref().unwrap().0,
self.interest,
self.mode,
token,
)?;
}

self.token = Some(token);
Ok(())
Expand All @@ -171,14 +280,20 @@ where
) -> crate::Result<()> {
let token = token_factory.token();

poll.reregister(&self.file, self.interest, self.mode, token)?;
poll.reregister(
&self.file.as_ref().unwrap().0,
self.interest,
self.mode,
token,
)?;

self.token = Some(token);
Ok(())
}

fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
poll.unregister(&self.file)?;
poll.unregister(&self.file.as_ref().unwrap().0)?;
self.poller = None;
self.token = None;
Ok(())
}
Expand Down Expand Up @@ -211,7 +326,7 @@ mod tests {
// we have not registered for writability
assert!(!readiness.writable);
let mut buffer = vec![0; 10];
let ret = file.read(&mut buffer).unwrap();
let ret = (&**file).read(&mut buffer).unwrap();
assert_eq!(ret, 6);
assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);

Expand Down Expand Up @@ -286,7 +401,7 @@ mod tests {
// we have not registered for writability
assert!(!readiness.writable);
let mut buffer = vec![0; 10];
let ret = file.read(&mut buffer).unwrap();
let ret = (&**file).read(&mut buffer).unwrap();
assert_eq!(ret, 6);
assert_eq!(&buffer[..6], &[1, 2, 3, 4, 5, 6]);

Expand Down
29 changes: 25 additions & 4 deletions src/sources/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ impl Signals {
self.mask.add(s);
}
self.mask.thread_block().map_err(IoError::from)?;
self.sfd.file.set_mask(&self.mask).map_err(IoError::from)?;

// SAFETY: We don't drop the underlying mask.
unsafe {
self.sfd
.get_mut()
.set_mask(&self.mask)
.map_err(IoError::from)?;
}
Ok(())
}

Expand All @@ -91,7 +98,14 @@ impl Signals {
removed.add(s);
}
removed.thread_unblock().map_err(IoError::from)?;
self.sfd.file.set_mask(&self.mask).map_err(IoError::from)?;

// SAFETY: We don't drop the underlying mask.
unsafe {
self.sfd
.get_mut()
.set_mask(&self.mask)
.map_err(IoError::from)?;
}
Ok(())
}

Expand All @@ -107,7 +121,14 @@ impl Signals {

self.mask.thread_unblock().map_err(IoError::from)?;
new_mask.thread_block().map_err(IoError::from)?;
self.sfd.file.set_mask(&new_mask).map_err(IoError::from)?;

// SAFETY: We don't drop the underlying mask.
unsafe {
self.sfd
.get_mut()
.set_mask(&new_mask)
.map_err(IoError::from)?;
}
self.mask = new_mask;

Ok(())
Expand Down Expand Up @@ -141,7 +162,7 @@ impl EventSource for Signals {
self.sfd
.process_events(readiness, token, |_, sfd| {
loop {
match sfd.read_signal() {
match unsafe { sfd.get_mut().read_signal() } {
Ok(Some(info)) => callback(Event { info }, &mut ()),
Ok(None) => break,
Err(e) => {
Expand Down
Loading

0 comments on commit 9999520

Please sign in to comment.