From 24be4f2100e34fa6ae87cfdb0932f5eeb3274934 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 20 Jul 2024 18:41:50 -0700 Subject: [PATCH 1/3] bugfix: Bound channel elements per iteration At the moment, the channel implentation processes as many channel elements as possible every time "process_events" is called. However, in multithreaded cases this can cause the loop to be stuck in the "process_events" section of a channel forever. If one thread keeps sending new elements into the channel while the current thread keeps reading them, it will starve other event sources of running time. This commit fixes this issue by bounding the number of channel elements that can be processed every time "process_events" is called. It chooses the smallest of the following numbers: - The capacity of the channel. - 1024 (chosen because this is also used by async-executor) If the channel is not empty after we have read this number of elements, the underlying source is not triggered. This should make it so the channel is immediately re-polled on the next dispatch. However it gives other sources more time to run. Signed-off-by: John Nunley --- src/sources/channel.rs | 104 +++++++++++++++++++++++++++++++++++------ 1 file changed, 91 insertions(+), 13 deletions(-) diff --git a/src/sources/channel.rs b/src/sources/channel.rs index c48f70e3..91275961 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -8,6 +8,7 @@ //! A synchronous version of the channel is provided by [`sync_channel`], in which //! the [`SyncSender`] will block when the channel is full. +use std::cmp; use std::fmt; use std::sync::mpsc; @@ -15,6 +16,8 @@ use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; use super::ping::{make_ping, Ping, PingError, PingSource}; +const MAX_EVENTS_CHECK: usize = 1024; + /// The events generated by the channel event source #[derive(Debug)] pub enum Event { @@ -123,6 +126,7 @@ impl SyncSender { pub struct Channel { receiver: mpsc::Receiver, source: PingSource, + capacity: usize, } // This impl is safe because the Channel is only able to move around threads @@ -156,14 +160,28 @@ impl Channel { pub fn channel() -> (Sender, Channel) { let (sender, receiver) = mpsc::channel(); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (Sender { sender, ping }, Channel { receiver, source }) + ( + Sender { sender, ping }, + Channel { + receiver, + source, + capacity: usize::MAX, + }, + ) } /// Create a new synchronous, bounded channel pub fn sync_channel(bound: usize) -> (SyncSender, Channel) { let (sender, receiver) = mpsc::sync_channel(bound); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (SyncSender { sender, ping }, Channel { receiver, source }) + ( + SyncSender { sender, ping }, + Channel { + receiver, + source, + capacity: bound, + }, + ) } impl EventSource for Channel { @@ -182,18 +200,33 @@ impl EventSource for Channel { C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { let receiver = &self.receiver; - self.source - .process_events(readiness, token, |(), &mut ()| loop { - match receiver.try_recv() { - Ok(val) => callback(Event::Msg(val), &mut ()), - Err(mpsc::TryRecvError::Empty) => break, - Err(mpsc::TryRecvError::Disconnected) => { - callback(Event::Closed, &mut ()); - break; - } + + let mut clear_readiness = false; + + // Limit the number of elements we process at a time to the channel's capacity, or 1024. + let max = cmp::min(self.capacity, MAX_EVENTS_CHECK); + for _ in 0..max { + match receiver.try_recv() { + Ok(val) => callback(Event::Msg(val), &mut ()), + Err(mpsc::TryRecvError::Empty) => { + clear_readiness = true; + break; } - }) - .map_err(ChannelError) + Err(mpsc::TryRecvError::Disconnected) => { + callback(Event::Closed, &mut ()); + clear_readiness = true; + break; + } + } + } + + if clear_readiness { + self.source + .process_events(readiness, token, |(), &mut ()| {}) + .map_err(ChannelError) + } else { + Ok(PostAction::Continue) + } } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { @@ -338,4 +371,49 @@ mod tests { assert_eq!(received.0, 3); assert!(received.1); } + + #[test] + fn test_more_than_1024() { + let mut event_loop = crate::EventLoop::try_new().unwrap(); + let handle = event_loop.handle(); + + let (tx, rx) = channel::<()>(); + let mut received = (0u32, false); + + handle + .insert_source( + rx, + move |evt, &mut (), received: &mut (u32, bool)| match evt { + Event::Msg(()) => received.0 += 1, + Event::Closed => received.1 = true, + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, 0); + assert!(!received.1); + + // Send 1025 elements into the channel. + for _ in 0..MAX_EVENTS_CHECK + 1 { + tx.send(()).unwrap(); + } + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, MAX_EVENTS_CHECK as u32); + assert!(!received.1); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32); + assert!(!received.1); + } } From aa61d500d5337aa61250bed4e43ef75d8f7bc7cf Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sat, 20 Jul 2024 21:38:22 -0700 Subject: [PATCH 2/3] tests: Add 1025 future test to executor Signed-off-by: John Nunley --- src/sources/futures.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/sources/futures.rs b/src/sources/futures.rs index ea2b4e5f..9a611980 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -405,6 +405,9 @@ impl std::error::Error for ExecutorError {} mod tests { use super::*; + use std::cell::RefCell; + use std::rc::Rc; + #[test] fn ready() { let mut event_loop = crate::EventLoop::::try_new().unwrap(); @@ -439,4 +442,35 @@ mod tests { // the future has run assert_eq!(got, 42); } + + #[test] + fn more_than_1024() { + let mut event_loop = crate::EventLoop::<()>::try_new().unwrap(); + let handle = event_loop.handle(); + + let (exec, sched) = executor::<()>().unwrap(); + handle.insert_source(exec, move |_, _, _| ()).unwrap(); + + let counter = Rc::new(RefCell::new(0)); + for _ in 0..1025 { + let counter = counter.clone(); + sched + .schedule(async move { + *counter.borrow_mut() += 1; + }) + .unwrap(); + } + + event_loop + .dispatch(Some(::std::time::Duration::ZERO), &mut ()) + .unwrap(); + + assert_eq!(*counter.borrow(), 1024); + + event_loop + .dispatch(Some(::std::time::Duration::ZERO), &mut ()) + .unwrap(); + + assert_eq!(*counter.borrow(), 1025); + } } From d09724ab551b1d9e007e3099030e30ce46730ee1 Mon Sep 17 00:00:00 2001 From: John Nunley Date: Sun, 21 Jul 2024 09:31:48 -0700 Subject: [PATCH 3/3] bugfix: Use a different strategy for yielding Previously, we only processed the source in the channel/executor implementations when we needed to clear the readiness; however this causes issues in the Windows `Ping` implementation. As a workaround, this commit always calls "process_events" in `Ping`, but simply re-notifies the PingSource if we need to try again. Signed-off-by: John Nunley --- src/sources/channel.rs | 56 ++++++++++++++--------- src/sources/futures.rs | 100 ++++++++++++++++++++++------------------- 2 files changed, 89 insertions(+), 67 deletions(-) diff --git a/src/sources/channel.rs b/src/sources/channel.rs index 91275961..c01407c9 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -126,6 +126,7 @@ impl SyncSender { pub struct Channel { receiver: mpsc::Receiver, source: PingSource, + ping: Ping, capacity: usize, } @@ -161,9 +162,13 @@ pub fn channel() -> (Sender, Channel) { let (sender, receiver) = mpsc::channel(); let (ping, source) = make_ping().expect("Failed to create a Ping."); ( - Sender { sender, ping }, + Sender { + sender, + ping: ping.clone(), + }, Channel { receiver, + ping, source, capacity: usize::MAX, }, @@ -175,10 +180,14 @@ pub fn sync_channel(bound: usize) -> (SyncSender, Channel) { let (sender, receiver) = mpsc::sync_channel(bound); let (ping, source) = make_ping().expect("Failed to create a Ping."); ( - SyncSender { sender, ping }, + SyncSender { + sender, + ping: ping.clone(), + }, Channel { receiver, source, + ping, capacity: bound, }, ) @@ -200,31 +209,36 @@ impl EventSource for Channel { C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { let receiver = &self.receiver; - + let capacity = self.capacity; let mut clear_readiness = false; - // Limit the number of elements we process at a time to the channel's capacity, or 1024. - let max = cmp::min(self.capacity, MAX_EVENTS_CHECK); - for _ in 0..max { - match receiver.try_recv() { - Ok(val) => callback(Event::Msg(val), &mut ()), - Err(mpsc::TryRecvError::Empty) => { - clear_readiness = true; - break; - } - Err(mpsc::TryRecvError::Disconnected) => { - callback(Event::Closed, &mut ()); - clear_readiness = true; - break; + let action = self + .source + .process_events(readiness, token, |(), &mut ()| { + // Limit the number of elements we process at a time to the channel's capacity, or 1024. + let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK); + for _ in 0..max { + match receiver.try_recv() { + Ok(val) => callback(Event::Msg(val), &mut ()), + Err(mpsc::TryRecvError::Empty) => { + clear_readiness = true; + break; + } + Err(mpsc::TryRecvError::Disconnected) => { + callback(Event::Closed, &mut ()); + clear_readiness = true; + break; + } + } } - } - } + }) + .map_err(ChannelError)?; if clear_readiness { - self.source - .process_events(readiness, token, |(), &mut ()| {}) - .map_err(ChannelError) + Ok(action) } else { + // Re-notify the ping source so we can try again. + self.ping.ping(); Ok(PostAction::Continue) } } diff --git a/src/sources/futures.rs b/src/sources/futures.rs index 9a611980..553f131f 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -48,7 +48,10 @@ pub struct Executor { state: Rc>, /// Notifies us when the executor is woken up. - ping: PingSource, + source: PingSource, + + /// Used for when we need to wake ourselves up. + ping: Ping, } /// A scheduler to send futures to an executor @@ -274,7 +277,7 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { active_tasks: RefCell::new(Some(Slab::new())), sender: Arc::new(Sender { sender: Mutex::new(sender), - wake_up, + wake_up: wake_up.clone(), notified: AtomicBool::new(false), }), }); @@ -282,7 +285,8 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { Ok(( Executor { state: state.clone(), - ping, + source: ping, + ping: wake_up, }, Scheduler { state }, )) @@ -305,62 +309,66 @@ impl EventSource for Executor { { let state = &self.state; - let clear_readiness = { + // Set to the unnotified state. + state.sender.notified.store(false, Ordering::SeqCst); + + let (clear_readiness, action) = { let mut clear_readiness = false; - // Process runnables, but not too many at a time; better to move onto the next event quickly! - for _ in 0..1024 { - let runnable = match state.incoming.try_recv() { - Ok(runnable) => runnable, - Err(_) => { - // Make sure to clear the readiness if there are no more runnables. - clear_readiness = true; - break; - } - }; + let action = self + .source + .process_events(readiness, token, |(), &mut ()| { + // Process runnables, but not too many at a time; better to move onto the next event quickly! + for _ in 0..1024 { + let runnable = match state.incoming.try_recv() { + Ok(runnable) => runnable, + Err(_) => { + // Make sure to clear the readiness if there are no more runnables. + clear_readiness = true; + break; + } + }; - // Run the runnable. - let index = *runnable.metadata(); - runnable.run(); + // Run the runnable. + let index = *runnable.metadata(); + runnable.run(); - // If the runnable finished with a result, call the callback. - let mut active_guard = state.active_tasks.borrow_mut(); - let active_tasks = active_guard.as_mut().unwrap(); + // If the runnable finished with a result, call the callback. + let mut active_guard = state.active_tasks.borrow_mut(); + let active_tasks = active_guard.as_mut().unwrap(); - if let Some(state) = active_tasks.get(index) { - if state.is_finished() { - // Take out the state and provide it to the caller. - let result = match active_tasks.remove(index) { - Active::Finished(result) => result, - _ => unreachable!(), - }; + if let Some(state) = active_tasks.get(index) { + if state.is_finished() { + // Take out the state and provide it to the caller. + let result = match active_tasks.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; - // Drop the guard since the callback may register another future to the scheduler. - drop(active_guard); + // Drop the guard since the callback may register another future to the scheduler. + drop(active_guard); - callback(result, &mut ()); + callback(result, &mut ()); + } + } } - } - } + }) + .map_err(ExecutorError::WakeError)?; - clear_readiness + (clear_readiness, action) }; - // Clear the readiness of the ping source if there are no more runnables. - if clear_readiness { - self.ping - .process_events(readiness, token, |(), &mut ()| {}) - .map_err(ExecutorError::WakeError)?; + // Re-ready the ping source if we need to re-run this handler. + if !clear_readiness { + self.ping.ping(); + Ok(PostAction::Continue) + } else { + Ok(action) } - - // Set to the unnotified state. - state.sender.notified.store(false, Ordering::SeqCst); - - Ok(PostAction::Continue) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.ping.register(poll, token_factory)?; + self.source.register(poll, token_factory)?; Ok(()) } @@ -369,12 +377,12 @@ impl EventSource for Executor { poll: &mut Poll, token_factory: &mut TokenFactory, ) -> crate::Result<()> { - self.ping.reregister(poll, token_factory)?; + self.source.reregister(poll, token_factory)?; Ok(()) } fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.ping.unregister(poll)?; + self.source.unregister(poll)?; Ok(()) } }