diff --git a/src/timed.rs b/src/timed.rs index 0911783..a32d538 100644 --- a/src/timed.rs +++ b/src/timed.rs @@ -3,6 +3,7 @@ //! To apply this to a given (receiving) actor: //! * Use [`TimedContext`] as [`Actor::Context`] associated type. //! * Such actors cannot be spawned unless wrapped, making it impossible to forget wrapping it. +//! * Wrapped actor's `Error` must implement [`From`]. //! * Wrap the actor in [`Timed`] before spawning. //! //! The wrapped actor will accept [`TimedMessage`] with convenience conversion from `M`. @@ -10,9 +11,8 @@ //! `send_delayed()`, `send_recurring()`. //! //! Once accepted by the actor, delayed and recurring messages do not occupy place in actor's -//! channel inbox, they are placed to internal queue instead. Due to the design, delayed and -//! recurring messages have always lower priority than instant messages when the actor is -//! saturated. +//! channel inbox, they are placed to internal queue instead. When delayed/recurring message become +//! due, they go through the actor's regular inboxes (subject to prioritization). //! //! See `delay_actor.rs` example for usage. @@ -24,11 +24,16 @@ use std::{ time::{Duration, Instant}, }; -/// A message that can be delivered now, at certain time and optionally repeatedly. +/// A message that can be enqueued now, at certain time and optionally repeatedly. pub enum TimedMessage { + /// Instant message `handle()`d by the wrapped actor right away. Instant { message: M }, - Delayed { message: M, fire_at: Instant }, - Recurring { factory: Box M + Send>, fire_at: Instant, interval: Duration }, + /// Request to setup a delayed message. Goes to internal [`Timed`] wrapper queue and then gets + /// sent to ourselves as an `Instant` message at the specified time. + Delayed { message: M, enqueue_at: Instant }, + /// Request to setup a recurring message. Goes to internal [`Timed`] wrapper queue and then gets + /// sent to ourselves as an `Instant` message regularly at the specified pace. + Recurring { factory: Box M + Send>, enqueue_at: Instant, interval: Duration }, } /// This implementation allows sending direct unwrapped messages to wrapped actors. @@ -43,19 +48,19 @@ pub trait RecipientExt { /// Send a `message` now. Convenience to wrap message in [`TimedMessage::Instant`]. fn send_now(&self, message: M) -> Result<(), SendError>; - /// Send a `message` to be delivered later at a certain instant. - fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>; + /// Send a `message` to be enqueued later at a certain instant. + fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError>; - /// Send a `message` to be delivered later after some time from now. + /// Send a `message` to be enqueued later after some time from now. fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> { self.send_timed(message, Instant::now() + delay) } - /// Schedule sending of message at `fire_at` plus at regular `interval`s from that point on. + /// Schedule sending of message at `enqueue_at` plus at regular `interval`s from that point on. fn send_recurring( &self, factory: impl FnMut() -> M + Send + 'static, - fire_at: Instant, + enqueue_at: Instant, interval: Duration, ) -> Result<(), SendError>; } @@ -65,17 +70,17 @@ impl RecipientExt for Recipient> { self.send(TimedMessage::Instant { message }) } - fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> { - self.send(TimedMessage::Delayed { message, fire_at }) + fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError> { + self.send(TimedMessage::Delayed { message, enqueue_at }) } fn send_recurring( &self, factory: impl FnMut() -> M + Send + 'static, - fire_at: Instant, + enqueue_at: Instant, interval: Duration, ) -> Result<(), SendError> { - self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval }) + self.send(TimedMessage::Recurring { factory: Box::new(factory), enqueue_at, interval }) } } @@ -111,16 +116,21 @@ pub struct Timed { queue: BinaryHeap>, } -impl, Message = M>> Timed { +impl, Message = M>> Timed +where + ::Error: From, +{ pub fn new(inner: A) -> Self { Self { inner, queue: Default::default() } } /// Process any pending messages in the internal queue, calling wrapped actor's `handle()`. - fn process_queue(&mut self, context: &mut ::Context) -> Result<(), A::Error> { - // Handle all messages that should have been handled by now. - let now = Instant::now(); - while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) { + fn process_queue(&mut self, context: &mut ::Context) -> Result<(), SendError> { + // If the message on top of the queue is due, send it to ourselves as `Instant` to enqueue + // it in the regular actor queue. + // No problem if there are multiple such messages, the next Timed::handle() will call + // process_queue() again. + if self.queue.peek().map(|m| m.enqueue_at <= Instant::now()).unwrap_or(false) { let item = self.queue.pop().expect("heap is non-empty, we have just peeked"); let message = match item.payload { @@ -128,21 +138,16 @@ impl, Message = M>> Timed< Payload::Recurring { mut factory, interval } => { let message = factory(); self.queue.push(QueueItem { - fire_at: item.fire_at + interval, + enqueue_at: item.enqueue_at + interval, payload: Payload::Recurring { factory, interval }, }); message }, }; - // Let inner actor do its job. - // - // Alternatively, we could send an `Instant` message to ourselves. - // - The advantage would be that it would go into the queue with proper priority. But it - // is unclear what should be handled first: normal-priority message that should have - // been processed a while ago, or a high-priority message that was delivered now. - // - Disadvantage is we could easily overflow the queue if many messages fire at once. - self.inner.handle(&mut TimedContext::from_context(context), message)?; + // Enqueue an immediate message to process. Alternative would be to call inner handle(), + // but we don't want to effectively call child handle() twice in the parent handle(). + context.myself.send_now(message)?; } Ok(()) @@ -150,11 +155,14 @@ impl, Message = M>> Timed< fn schedule_timeout(&self, context: &mut ::Context) { // Schedule next timeout if the queue is not empty. - context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at)); + context.set_deadline(self.queue.peek().map(|earliest| earliest.enqueue_at)); } } -impl, Message = M>> Actor for Timed { +impl, Message = M>> Actor for Timed +where + ::Error: From, +{ type Context = Context; type Error = A::Error; type Message = TimedMessage; @@ -171,12 +179,14 @@ impl, Message = M>> Actor TimedMessage::Instant { message } => { self.inner.handle(&mut TimedContext::from_context(context), message)?; }, - TimedMessage::Delayed { message, fire_at } => { - self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } }); + TimedMessage::Delayed { message, enqueue_at } => { + self.queue.push(QueueItem { enqueue_at, payload: Payload::Delayed { message } }); }, - TimedMessage::Recurring { factory, fire_at, interval } => { - self.queue - .push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } }); + TimedMessage::Recurring { factory, enqueue_at, interval } => { + self.queue.push(QueueItem { + enqueue_at, + payload: Payload::Recurring { factory, interval }, + }); }, }; @@ -195,14 +205,13 @@ impl, Message = M>> Actor fn priority(message: &Self::Message) -> Priority { match message { - // Use underlying message priority if we can reference it. - TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => { - A::priority(message) - }, - // Recurring message is only received once, the recurring instances go through the - // internal queue (and not actor's channel). Assign high priority to the request to - // set-up the recurrent sending. - TimedMessage::Recurring { .. } => Priority::High, + // Use underlying message priority for instant messages. + TimedMessage::Instant { message } => A::priority(message), + // These priorities apply to the *set-up* of Delayed and Recurring messages and we + // want to handle that pronto. + // The resulting inner message then comes back as `Instant` and is prioritized per its + // underlying priority. + TimedMessage::Recurring { .. } | TimedMessage::Delayed { .. } => Priority::High, } } @@ -236,13 +245,13 @@ impl Deref for Timed { /// Implementation detail, element of message queue ordered by time to fire at. struct QueueItem { - fire_at: Instant, + enqueue_at: Instant, payload: Payload, } impl PartialEq for QueueItem { fn eq(&self, other: &Self) -> bool { - self.fire_at == other.fire_at + self.enqueue_at == other.enqueue_at } } @@ -257,8 +266,8 @@ impl PartialOrd for QueueItem { impl Ord for QueueItem { fn cmp(&self, other: &Self) -> Ordering { - // Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `fire_at`. - self.fire_at.cmp(&other.fire_at).reverse() + // Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `enqueue_at`. + self.enqueue_at.cmp(&other.enqueue_at).reverse() } } @@ -277,19 +286,20 @@ mod tests { }; struct TimedTestActor { + recurring_message_sleep: Duration, received: Arc>>, } impl Actor for TimedTestActor { type Context = TimedContext; - type Error = (); + type Error = SendError; type Message = usize; fn name() -> &'static str { "TimedTestActor" } - fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), ()> { + fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), SendError> { { let mut guard = self.received.lock().unwrap(); guard.push(message); @@ -301,38 +311,78 @@ mod tests { context.myself.send_now(3).unwrap(); } - Ok(()) - } + // Message 2 is a recurring one, sleep based on a parameter. + if message == 2 { + thread::sleep(self.recurring_message_sleep); + } - fn started(&mut self, context: &mut Self::Context) { - context - .myself - .send_recurring( - || 2, - Instant::now() + Duration::from_millis(50), - Duration::from_millis(100), - ) - .unwrap() + Ok(()) } } + /// Tests that recurring messages still get in for actors that have one "tick" message type that + /// does `block_for_some_time(); myself.send_now(Tick);` in its handle(). #[test] - fn recurring_messages_for_busy_actors() { + fn recurring_messages_for_self_looping_actors() { let received = Arc::new(Mutex::new(Vec::new())); let mut system = System::new("timed test"); - let address = - system.spawn(Timed::new(TimedTestActor { received: Arc::clone(&received) })).unwrap(); + let address = system + .spawn(Timed::new(TimedTestActor { + recurring_message_sleep: Duration::ZERO, + received: Arc::clone(&received), + })) + .unwrap(); + address + .send_recurring( + || 2, + Instant::now() + Duration::from_millis(50), + Duration::from_millis(100), + ) + .unwrap(); + address.send_now(1).unwrap(); thread::sleep(Duration::from_millis(225)); + system.shutdown().unwrap(); - // The order of messages should be: - // 1 (initial message), - // 2 (first recurring scheduled message), - // 3 (first self-sent message), - // 2 (second recurring message) - // 3 (second self-sent message) - assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3]); + // The timeline (order of messages received) is: + // at 0 ms: 1 (initial message, takes 100 ms to handle), + // at 100 ms: 3 (first self-sent message, 100 ms to handle), + // at 200 ms: 2 (first recurring scheduled message, delivered 150 ms late), + // at 200 ms: 3 (second self-sent message, 100 ms to handle) + // at 225 ms: (control message to shut down the actor sent) + // at 300 ms: (control signal to shut down finally delivered to the actor) + assert_eq!(*received.lock().unwrap(), vec![1, 3, 2, 3]); + } + + /// Test that actors with recurring messages that take longer to handle than what the recurring + /// delay is still get other and control messages. + #[test] + fn recurring_messages_handled_slower_than_generated() { + let received = Arc::new(Mutex::new(Vec::new())); + + let mut system = System::new("timed test"); + let address = system + .spawn(Timed::new(TimedTestActor { + recurring_message_sleep: Duration::from_millis(100), + received: Arc::clone(&received), + })) + .unwrap(); + address.send_recurring(|| 2, Instant::now(), Duration::from_millis(10)).unwrap(); + + thread::sleep(Duration::from_millis(150)); + address.send_now(4).unwrap(); + thread::sleep(Duration::from_millis(125)); system.shutdown().unwrap(); + + // The timeline (order of messages received) is: + // at 0 ms: 2 (first recurring message, 100 ms to handle) + // at 100 ms: 2 (second recurring message, 90 ms late, 100 ms to handle) + // at 150 ms: (message "4" sent to the actor from the main thread) + // at 200 ms: 4 (actor wakes up, processes message 4 that was sent before the recurring one) + // at 200 ms: 2 (third recurring message, 180 ms late, 100 ms to handle) + // at 275 ms: (control message to shut down actor sent) + // at 300 ms: (control message to shut down received at highest priority) + assert_eq!(*received.lock().unwrap(), vec![2, 2, 4, 2]); } }