Skip to content

Commit

Permalink
Update for new polling breaking changes
Browse files Browse the repository at this point in the history
  • Loading branch information
notgull committed Jul 11, 2023
1 parent 197537d commit ffd935b
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 33 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ rustdoc-args = ["--cfg", "docsrs"]
[[test]]
name = "signals"
harness = false

[patch.crates-io]
polling = { git = "https://github.com/smol-rs/polling.git", branch = "notgull/unsafe2" }
10 changes: 7 additions & 3 deletions src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ impl<'l, F: AsFd> Async<'l, F> {
}));
let key = inner.sources.borrow_mut().insert(dispatcher.clone());
dispatcher.borrow_mut().token = Some(Token { key });
inner.register(&dispatcher)?;

// SAFETY: We are sure to deregister on drop.
unsafe {
inner.register(&dispatcher)?;
}

// Straightforward casting would require us to add the bound `Data: 'l` but we don't actually need it
// as this module never accesses the dispatch data, so we use transmute to erase it
Expand Down Expand Up @@ -168,13 +172,13 @@ impl<'l, F: AsFd> Drop for Async<'l, F> {
impl<'l, F: AsFd> Unpin for Async<'l, F> {}

trait IoLoopInner {
fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>;
fn kill(&self, dispatcher: &RefCell<IoDispatcher>);
}

impl<'l, Data> IoLoopInner for LoopInner<'l, Data> {
fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
let disp = dispatcher.borrow();
self.poll.borrow_mut().register(
unsafe { BorrowedFd::borrow_raw(disp.fd) },
Expand Down
118 changes: 106 additions & 12 deletions src/sources/generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
//! [`EventSource`](crate::EventSource) implementation to them.

use io_lifetimes::{AsFd, BorrowedFd};
use std::{marker::PhantomData, ops, os::unix::io::AsRawFd};
use polling::Poller;
use std::{borrow, marker::PhantomData, ops, os::unix::io::AsRawFd, sync::Arc};

use crate::{EventSource, Interest, Mode, Poll, PostAction, Readiness, Token, TokenFactory};

Expand Down Expand Up @@ -78,16 +79,58 @@ impl<T: AsRawFd> AsFd for FdWrapper<T> {
}
}

/// A wrapper around a type that doesn't expose it mutably safely.
#[derive(Debug)]
pub struct NoIoDrop<T>(T);

impl<T> NoIoDrop<T> {
/// Get a mutable reference.
///
/// # Safety
///
/// The inner type's I/O source must not be dropped.
pub unsafe fn get_mut(&mut self) -> &mut T {
&mut self.0
}
}

impl<T> AsRef<T> for NoIoDrop<T> {
fn as_ref(&self) -> &T {
&self.0
}
}

impl<T> borrow::Borrow<T> for NoIoDrop<T> {
fn borrow(&self) -> &T {
&self.0
}
}

impl<T> ops::Deref for NoIoDrop<T> {
type Target = T;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// A generic event source wrapping a FD-backed type
#[derive(Debug)]
pub struct Generic<F: AsFd, E = std::io::Error> {
/// The wrapped FD-backed type
pub file: F,
/// The wrapped FD-backed type.
///
/// This must be deregistered before it is dropped.
file: Option<NoIoDrop<F>>,
/// The programmed interest
pub interest: Interest,
/// The programmed mode
pub mode: Mode,

/// Back-reference to the poller.
///
/// This is needed to drop the original file.
poller: Option<Arc<Poller>>,

// This token is used by the event loop logic to look up this source when an
// event occurs.
token: Option<Token>,
Expand All @@ -101,30 +144,64 @@ impl<F: AsFd> Generic<F, std::io::Error> {
/// [`std::io::Error`] as its error type.
pub fn new(file: F, interest: Interest, mode: Mode) -> Generic<F, std::io::Error> {
Generic {
file,
file: Some(NoIoDrop(file)),
interest,
mode,
token: None,
poller: None,
_error_type: PhantomData::default(),
}
}

/// Wrap a FD-backed type into a `Generic` event source using an arbitrary error type.
pub fn new_with_error<E>(file: F, interest: Interest, mode: Mode) -> Generic<F, E> {
Generic {
file,
file: Some(NoIoDrop(file)),
interest,
mode,
token: None,
poller: None,
_error_type: PhantomData::default(),
}
}
}

impl<F: AsFd, E> Generic<F, E> {
/// Unwrap the `Generic` source to retrieve the underlying type
pub fn unwrap(self) -> F {
self.file
pub fn unwrap(mut self) -> F {
let NoIoDrop(file) = self.file.take().unwrap();

// Remove it from the poller.
if let Some(poller) = self.poller.take() {
poller.delete(file.as_fd()).ok();
}

file
}

/// Get a reference to the underlying type.
pub fn get_ref(&self) -> &F {
&self.file.as_ref().unwrap().0
}

/// Get a mutable reference to the underlying type.
///
/// # Safety
///
/// This is unsafe because it allows you to modify the underlying type, which
/// allows you to drop the underlying event source. Dropping the underlying source
/// leads to a dangling reference.
pub unsafe fn get_mut(&mut self) -> &mut F {
self.file.as_mut().unwrap().get_mut()
}
}

impl<F: AsFd, E> Drop for Generic<F, E> {
fn drop(&mut self) {
// Remove it from the poller.
if let (Some(file), Some(poller)) = (self.file.take(), self.poller.take()) {
poller.delete(file.as_fd()).ok();
}
}
}

Expand All @@ -134,7 +211,7 @@ where
E: Into<Box<dyn std::error::Error + Send + Sync>>,
{
type Event = Readiness;
type Metadata = F;
type Metadata = NoIoDrop<F>;
type Ret = Result<PostAction, E>;
type Error = E;

Expand All @@ -152,13 +229,24 @@ where
return Ok(PostAction::Continue);
}

callback(readiness, &mut self.file)
callback(readiness, self.file.as_mut().unwrap())
}

fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
let token = token_factory.token();

poll.register(&self.file, self.interest, self.mode, token)?;
// Make sure we can use the poller to deregister if need be.
self.poller = Some(poll.poller().clone());

// SAFETY: We've now ensured that we have a poller to deregister with.
unsafe {
poll.register(
&self.file.as_ref().unwrap().0,
self.interest,
self.mode,
token,
)?;
}

self.token = Some(token);
Ok(())
Expand All @@ -171,14 +259,20 @@ where
) -> crate::Result<()> {
let token = token_factory.token();

poll.reregister(&self.file, self.interest, self.mode, token)?;
poll.reregister(
&self.file.as_ref().unwrap().0,
self.interest,
self.mode,
token,
)?;

self.token = Some(token);
Ok(())
}

fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
poll.unregister(&self.file)?;
poll.unregister(&self.file.as_ref().unwrap().0)?;
self.poller = None;
self.token = None;
Ok(())
}
Expand Down
20 changes: 16 additions & 4 deletions src/sources/signals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ impl Signals {
self.mask.add(s);
}
self.mask.thread_block()?;
self.sfd.file.set_mask(&self.mask)?;

// SAFETY: We don't drop the underlying mask.
unsafe {
self.sfd.get_mut().set_mask(&self.mask)?;
}
Ok(())
}

Expand All @@ -89,7 +93,11 @@ impl Signals {
removed.add(s);
}
removed.thread_unblock()?;
self.sfd.file.set_mask(&self.mask)?;

// SAFETY: We don't drop the underlying mask.
unsafe {
self.sfd.get_mut().set_mask(&self.mask)?;
}
Ok(())
}

Expand All @@ -105,7 +113,11 @@ impl Signals {

self.mask.thread_unblock()?;
new_mask.thread_block()?;
self.sfd.file.set_mask(&new_mask)?;

// SAFETY: We don't drop the underlying mask.
unsafe {
self.sfd.get_mut().set_mask(&new_mask)?;
}
self.mask = new_mask;

Ok(())
Expand Down Expand Up @@ -139,7 +151,7 @@ impl EventSource for Signals {
self.sfd
.process_events(readiness, token, |_, sfd| {
loop {
match sfd.read_signal() {
match unsafe { sfd.get_mut().read_signal() } {
Ok(Some(info)) => callback(Event { info }, &mut ()),
Ok(None) => break,
Err(e) => {
Expand Down
Loading

0 comments on commit ffd935b

Please sign in to comment.