diff --git a/src/sources/channel.rs b/src/sources/channel.rs index c48f70e3..c01407c9 100644 --- a/src/sources/channel.rs +++ b/src/sources/channel.rs @@ -8,6 +8,7 @@ //! A synchronous version of the channel is provided by [`sync_channel`], in which //! the [`SyncSender`] will block when the channel is full. +use std::cmp; use std::fmt; use std::sync::mpsc; @@ -15,6 +16,8 @@ use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; use super::ping::{make_ping, Ping, PingError, PingSource}; +const MAX_EVENTS_CHECK: usize = 1024; + /// The events generated by the channel event source #[derive(Debug)] pub enum Event { @@ -123,6 +126,8 @@ impl SyncSender { pub struct Channel { receiver: mpsc::Receiver, source: PingSource, + ping: Ping, + capacity: usize, } // This impl is safe because the Channel is only able to move around threads @@ -156,14 +161,36 @@ impl Channel { pub fn channel() -> (Sender, Channel) { let (sender, receiver) = mpsc::channel(); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (Sender { sender, ping }, Channel { receiver, source }) + ( + Sender { + sender, + ping: ping.clone(), + }, + Channel { + receiver, + ping, + source, + capacity: usize::MAX, + }, + ) } /// Create a new synchronous, bounded channel pub fn sync_channel(bound: usize) -> (SyncSender, Channel) { let (sender, receiver) = mpsc::sync_channel(bound); let (ping, source) = make_ping().expect("Failed to create a Ping."); - (SyncSender { sender, ping }, Channel { receiver, source }) + ( + SyncSender { + sender, + ping: ping.clone(), + }, + Channel { + receiver, + source, + ping, + capacity: bound, + }, + ) } impl EventSource for Channel { @@ -182,18 +209,38 @@ impl EventSource for Channel { C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, { let receiver = &self.receiver; - self.source - .process_events(readiness, token, |(), &mut ()| loop { - match receiver.try_recv() { - Ok(val) => callback(Event::Msg(val), &mut ()), - Err(mpsc::TryRecvError::Empty) => break, - Err(mpsc::TryRecvError::Disconnected) => { - callback(Event::Closed, &mut ()); - break; + let capacity = self.capacity; + let mut clear_readiness = false; + + let action = self + .source + .process_events(readiness, token, |(), &mut ()| { + // Limit the number of elements we process at a time to the channel's capacity, or 1024. + let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK); + for _ in 0..max { + match receiver.try_recv() { + Ok(val) => callback(Event::Msg(val), &mut ()), + Err(mpsc::TryRecvError::Empty) => { + clear_readiness = true; + break; + } + Err(mpsc::TryRecvError::Disconnected) => { + callback(Event::Closed, &mut ()); + clear_readiness = true; + break; + } } } }) - .map_err(ChannelError) + .map_err(ChannelError)?; + + if clear_readiness { + Ok(action) + } else { + // Re-notify the ping source so we can try again. + self.ping.ping(); + Ok(PostAction::Continue) + } } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { @@ -338,4 +385,49 @@ mod tests { assert_eq!(received.0, 3); assert!(received.1); } + + #[test] + fn test_more_than_1024() { + let mut event_loop = crate::EventLoop::try_new().unwrap(); + let handle = event_loop.handle(); + + let (tx, rx) = channel::<()>(); + let mut received = (0u32, false); + + handle + .insert_source( + rx, + move |evt, &mut (), received: &mut (u32, bool)| match evt { + Event::Msg(()) => received.0 += 1, + Event::Closed => received.1 = true, + }, + ) + .unwrap(); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, 0); + assert!(!received.1); + + // Send 1025 elements into the channel. + for _ in 0..MAX_EVENTS_CHECK + 1 { + tx.send(()).unwrap(); + } + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, MAX_EVENTS_CHECK as u32); + assert!(!received.1); + + event_loop + .dispatch(Some(std::time::Duration::ZERO), &mut received) + .unwrap(); + + assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32); + assert!(!received.1); + } } diff --git a/src/sources/futures.rs b/src/sources/futures.rs index ea2b4e5f..553f131f 100644 --- a/src/sources/futures.rs +++ b/src/sources/futures.rs @@ -48,7 +48,10 @@ pub struct Executor { state: Rc>, /// Notifies us when the executor is woken up. - ping: PingSource, + source: PingSource, + + /// Used for when we need to wake ourselves up. + ping: Ping, } /// A scheduler to send futures to an executor @@ -274,7 +277,7 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { active_tasks: RefCell::new(Some(Slab::new())), sender: Arc::new(Sender { sender: Mutex::new(sender), - wake_up, + wake_up: wake_up.clone(), notified: AtomicBool::new(false), }), }); @@ -282,7 +285,8 @@ pub fn executor() -> crate::Result<(Executor, Scheduler)> { Ok(( Executor { state: state.clone(), - ping, + source: ping, + ping: wake_up, }, Scheduler { state }, )) @@ -305,62 +309,66 @@ impl EventSource for Executor { { let state = &self.state; - let clear_readiness = { + // Set to the unnotified state. + state.sender.notified.store(false, Ordering::SeqCst); + + let (clear_readiness, action) = { let mut clear_readiness = false; - // Process runnables, but not too many at a time; better to move onto the next event quickly! - for _ in 0..1024 { - let runnable = match state.incoming.try_recv() { - Ok(runnable) => runnable, - Err(_) => { - // Make sure to clear the readiness if there are no more runnables. - clear_readiness = true; - break; - } - }; + let action = self + .source + .process_events(readiness, token, |(), &mut ()| { + // Process runnables, but not too many at a time; better to move onto the next event quickly! + for _ in 0..1024 { + let runnable = match state.incoming.try_recv() { + Ok(runnable) => runnable, + Err(_) => { + // Make sure to clear the readiness if there are no more runnables. + clear_readiness = true; + break; + } + }; - // Run the runnable. - let index = *runnable.metadata(); - runnable.run(); + // Run the runnable. + let index = *runnable.metadata(); + runnable.run(); - // If the runnable finished with a result, call the callback. - let mut active_guard = state.active_tasks.borrow_mut(); - let active_tasks = active_guard.as_mut().unwrap(); + // If the runnable finished with a result, call the callback. + let mut active_guard = state.active_tasks.borrow_mut(); + let active_tasks = active_guard.as_mut().unwrap(); - if let Some(state) = active_tasks.get(index) { - if state.is_finished() { - // Take out the state and provide it to the caller. - let result = match active_tasks.remove(index) { - Active::Finished(result) => result, - _ => unreachable!(), - }; + if let Some(state) = active_tasks.get(index) { + if state.is_finished() { + // Take out the state and provide it to the caller. + let result = match active_tasks.remove(index) { + Active::Finished(result) => result, + _ => unreachable!(), + }; - // Drop the guard since the callback may register another future to the scheduler. - drop(active_guard); + // Drop the guard since the callback may register another future to the scheduler. + drop(active_guard); - callback(result, &mut ()); + callback(result, &mut ()); + } + } } - } - } + }) + .map_err(ExecutorError::WakeError)?; - clear_readiness + (clear_readiness, action) }; - // Clear the readiness of the ping source if there are no more runnables. - if clear_readiness { - self.ping - .process_events(readiness, token, |(), &mut ()| {}) - .map_err(ExecutorError::WakeError)?; + // Re-ready the ping source if we need to re-run this handler. + if !clear_readiness { + self.ping.ping(); + Ok(PostAction::Continue) + } else { + Ok(action) } - - // Set to the unnotified state. - state.sender.notified.store(false, Ordering::SeqCst); - - Ok(PostAction::Continue) } fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { - self.ping.register(poll, token_factory)?; + self.source.register(poll, token_factory)?; Ok(()) } @@ -369,12 +377,12 @@ impl EventSource for Executor { poll: &mut Poll, token_factory: &mut TokenFactory, ) -> crate::Result<()> { - self.ping.reregister(poll, token_factory)?; + self.source.reregister(poll, token_factory)?; Ok(()) } fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { - self.ping.unregister(poll)?; + self.source.unregister(poll)?; Ok(()) } } @@ -405,6 +413,9 @@ impl std::error::Error for ExecutorError {} mod tests { use super::*; + use std::cell::RefCell; + use std::rc::Rc; + #[test] fn ready() { let mut event_loop = crate::EventLoop::::try_new().unwrap(); @@ -439,4 +450,35 @@ mod tests { // the future has run assert_eq!(got, 42); } + + #[test] + fn more_than_1024() { + let mut event_loop = crate::EventLoop::<()>::try_new().unwrap(); + let handle = event_loop.handle(); + + let (exec, sched) = executor::<()>().unwrap(); + handle.insert_source(exec, move |_, _, _| ()).unwrap(); + + let counter = Rc::new(RefCell::new(0)); + for _ in 0..1025 { + let counter = counter.clone(); + sched + .schedule(async move { + *counter.borrow_mut() += 1; + }) + .unwrap(); + } + + event_loop + .dispatch(Some(::std::time::Duration::ZERO), &mut ()) + .unwrap(); + + assert_eq!(*counter.borrow(), 1024); + + event_loop + .dispatch(Some(::std::time::Duration::ZERO), &mut ()) + .unwrap(); + + assert_eq!(*counter.borrow(), 1025); + } }