Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay freeing Slab ids until events are processed #156

Merged
merged 2 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

#### Bugfixes

- Fix an issue, where id-reuse could execute a PostAction on a newly registered event source

Drakulix marked this conversation as resolved.
Show resolved Hide resolved
## 0.12.2 -- 2023-09-25

#### Bugfixes
Expand Down
2 changes: 1 addition & 1 deletion src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
interest: Interest::EMPTY,
last_readiness: Readiness::EMPTY,
}));
let key = inner.sources.borrow_mut().insert(dispatcher.clone());
let key = inner.sources.borrow_mut().insert(Some(dispatcher.clone()));
dispatcher.borrow_mut().token = Some(Token { key });

// SAFETY: We are sure to deregister on drop.
Expand Down Expand Up @@ -180,7 +180,7 @@
unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> {
let disp = dispatcher.borrow();
self.poll.borrow_mut().register(
unsafe { BorrowedFd::borrow_raw(disp.fd) },

Check warning on line 183 in src/io.rs

View workflow job for this annotation

GitHub Actions / CI (1.63.0)

unnecessary `unsafe` block

Check warning on line 183 in src/io.rs

View workflow job for this annotation

GitHub Actions / CI (1.63.0)

unnecessary `unsafe` block
Interest::EMPTY,
Mode::OneShot,
disp.token.expect("No token for IO dispatcher"),
Expand Down
113 changes: 86 additions & 27 deletions src/loop_logic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,9 @@

pub(crate) struct LoopInner<'l, Data> {
pub(crate) poll: RefCell<Poll>,
pub(crate) sources: RefCell<Slab<Rc<dyn EventDispatcher<Data> + 'l>>>,
// The `Option` is used to keep slots of the slab occipied, to prevent id reuse
// while in-flight events might still referr to a recently destroyed event source.
pub(crate) sources: RefCell<Slab<Option<Rc<dyn EventDispatcher<Data> + 'l>>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a comment here explaining the meaning and role of this Option

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this sufficient?

pub(crate) sources_with_additional_lifecycle_events: RefCell<AdditionalLifecycleEventsSet>,
idles: RefCell<Vec<IdleCallback<'l, Data>>>,
pending_action: Cell<PostAction>,
Expand Down Expand Up @@ -149,9 +151,9 @@
)));
}

let key = sources.insert(dispatcher.clone_as_event_dispatcher());
let key = sources.insert(Some(dispatcher.clone_as_event_dispatcher()));
trace!("[calloop] Inserting new source #{}", key);
let ret = sources.get(key).unwrap().register(
let ret = sources.get(key).unwrap().as_ref().unwrap().register(
&mut poll,
&mut self
.inner
Expand Down Expand Up @@ -189,7 +191,7 @@
///
/// **Note:** this cannot be done from within the source callback.
pub fn enable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if let Some(Some(source)) = self.inner.sources.borrow().get(token.key) {
trace!("[calloop] Registering source #{}", token.key);
source.register(
&mut self.inner.poll.borrow_mut(),
Expand All @@ -208,7 +210,7 @@
/// If after accessing the source you changed its parameters in a way that requires
/// updating its registration.
pub fn update(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if let Some(Some(source)) = self.inner.sources.borrow().get(token.key) {
trace!("[calloop] Updating registration of source #{}", token.key);
if !source.reregister(
&mut self.inner.poll.borrow_mut(),
Expand All @@ -230,7 +232,7 @@
///
/// The source remains in the event loop, but it'll no longer generate events
pub fn disable(&self, token: &RegistrationToken) -> crate::Result<()> {
if let Some(source) = self.inner.sources.borrow().get(token.key) {
if let Some(Some(source)) = self.inner.sources.borrow().get(token.key) {
trace!("[calloop] Unregistering source #{}", token.key);
if !source.unregister(
&mut self.inner.poll.borrow_mut(),
Expand All @@ -250,20 +252,25 @@

/// Removes this source from the event loop.
pub fn remove(&self, token: RegistrationToken) {
if let Some(source) = self.inner.sources.borrow_mut().try_remove(token.key) {
trace!("[calloop] Removing source #{}", token.key);
if let Err(e) = source.unregister(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
token,
) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",
e
);
if let Some(source) = self.inner.sources.borrow_mut().get_mut(token.key) {
// We intentionally leave `None` in-place, as this might be called from
// within an event source callback. It will get cleaned up on the end
// of the currently running or next `dispatch_events` call.
if let Some(source) = source.take() {
trace!("[calloop] Removing source #{}", token.key);
if let Err(e) = source.unregister(
&mut self.inner.poll.borrow_mut(),
&mut self
.inner
.sources_with_additional_lifecycle_events
.borrow_mut(),
token,
) {
log::warn!(
"[calloop] Failed to unregister source from the polling system: {:?}",

Check warning on line 270 in src/loop_logic.rs

View check run for this annotation

Codecov / codecov/patch

src/loop_logic.rs#L269-L270

Added lines #L269 - L270 were not covered by tests
e
);
}
}
}
}
Expand Down Expand Up @@ -353,7 +360,7 @@
.borrow_mut();
let sources = &self.handle.inner.sources.borrow();
for source in &mut *extra_lifecycle_sources.values {
if let Some(disp) = sources.get(source.key) {
if let Some(Some(disp)) = sources.get(source.key) {
if let Some((readiness, token)) = disp.before_sleep()? {
// Wake up instantly after polling if we recieved an event
timeout = Some(Duration::ZERO);
Expand Down Expand Up @@ -394,7 +401,7 @@
.borrow_mut();
if !extra_lifecycle_sources.values.is_empty() {
for source in &mut *extra_lifecycle_sources.values {
if let Some(disp) = self.handle.inner.sources.borrow().get(source.key) {
if let Some(Some(disp)) = self.handle.inner.sources.borrow().get(source.key) {
let iter = EventIterator {
inner: events.iter(),
registration_token: *source,
Expand All @@ -419,7 +426,7 @@
.get(registroken_token)
.cloned();

if let Some(disp) = opt_disp {
if let Some(Some(disp)) = opt_disp {
trace!(
"[calloop] Dispatching events for source #{}",
registroken_token
Expand Down Expand Up @@ -472,22 +479,29 @@
"[calloop] Postaction remove for source #{}",
registroken_token
);
// delete the source from the list, it'll be cleaned up with the if just below
// We intentionally leave `None` in-place, while there are still
// events being processed to prevent id reuse.
// Unregister will happen right after this match and cleanup at
// the end of the function.
self.handle
.inner
.sources
.borrow_mut()
.remove(registroken_token);
.get_mut(registroken_token)
.unwrap_or(&mut None)
.take();
}
PostAction::Continue => {}
}

if !self
if self
.handle
.inner
.sources
.borrow()
.contains(registroken_token)
.get(registroken_token)
.unwrap_or(&None)
.is_none()
{
// the source has been removed from within its callback, unregister it
let mut poll = self.handle.inner.poll.borrow_mut();
Expand All @@ -514,6 +528,13 @@
}
}

// cleanup empty event source slots to free up ids again
self.handle
.inner
.sources
.borrow_mut()
.retain(|_, opt_disp| opt_disp.is_some());

Ok(())
}

Expand Down Expand Up @@ -1457,6 +1478,44 @@
assert_eq!(data, 22);
}

#[test]
fn reuse() {
use crate::sources::timer;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};

let mut evl = EventLoop::<RegistrationToken>::try_new().unwrap();
let handle = evl.handle();

let data = Arc::new(Mutex::new(1));
let data_cloned = data.clone();

let timer_source = timer::Timer::from_duration(Duration::from_secs(1));
let mut first_timer_token = evl
.handle()
.insert_source(timer_source, move |_, _, own_token| {
handle.remove(*own_token);
let data_cloned = data_cloned.clone();
let _ = handle.insert_source(timer::Timer::immediate(), move |_, _, _| {
*data_cloned.lock().unwrap() = 2;
timer::TimeoutAction::Drop
});
timer::TimeoutAction::Drop
})
.unwrap();

let now = Instant::now();
loop {
evl.dispatch(Some(Duration::from_secs(3)), &mut first_timer_token)
.unwrap();
if Instant::now().duration_since(now) > Duration::from_secs(3) {
break;
}
}

assert_eq!(*data.lock().unwrap(), 2);
}

#[test]
fn drop_of_subsource() {
struct WithSubSource {
Expand Down
Loading