Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Timed infinite loop fix #80

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 61 additions & 64 deletions src/timed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
//! To apply this to a given (receiving) actor:
//! * Use [`TimedContext<Self::Message>`] as [`Actor::Context`] associated type.
//! * Such actors cannot be spawned unless wrapped, making it impossible to forget wrapping it.
//! * Wrapped actor's `Error` must implement [`From<SendError>`].
//! * Wrap the actor in [`Timed`] before spawning.
//!
//! The wrapped actor will accept [`TimedMessage<M>`] with convenience conversion from `M`.
//! [`RecipientExt`] becomes available for [`Recipient<TimedMessage<M>>`]s which provides methods like
//! `send_delayed()`, `send_recurring()`.
//!
//! Once accepted by the actor, delayed and recurring messages do not occupy place in actor's
//! channel inbox, they are placed to internal queue instead. Due to the design, delayed and
//! recurring messages have always lower priority than instant messages when the actor is
//! saturated.
//! channel inbox, they are placed to internal queue instead. When delayed/recurring message become
//! due, they go through the actor's regular inboxes (subject to prioritization).
//!
//! See `delay_actor.rs` example for usage.

Expand All @@ -24,11 +24,11 @@ use std::{
time::{Duration, Instant},
};

/// A message that can be delivered now, at certain time and optionally repeatedly.
/// A message that can be enqueued now, at certain time and optionally repeatedly.
pub enum TimedMessage<M> {
Instant { message: M },
Delayed { message: M, fire_at: Instant },
Recurring { factory: Box<dyn FnMut() -> M + Send>, fire_at: Instant, interval: Duration },
Delayed { message: M, enqueue_at: Instant },
Recurring { factory: Box<dyn FnMut() -> M + Send>, enqueue_at: Instant, interval: Duration },
}

/// This implementation allows sending direct unwrapped messages to wrapped actors.
Expand All @@ -43,19 +43,19 @@ pub trait RecipientExt<M> {
/// Send a `message` now. Convenience to wrap message in [`TimedMessage::Instant`].
fn send_now(&self, message: M) -> Result<(), SendError>;

/// Send a `message` to be delivered later at a certain instant.
fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError>;
/// Send a `message` to be enqueued later at a certain instant.
fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError>;

/// Send a `message` to be delivered later after some time from now.
/// Send a `message` to be enqueued later after some time from now.
fn send_delayed(&self, message: M, delay: Duration) -> Result<(), SendError> {
self.send_timed(message, Instant::now() + delay)
}

/// Schedule sending of message at `fire_at` plus at regular `interval`s from that point on.
/// Schedule sending of message at `enqueue_at` plus at regular `interval`s from that point on.
fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
enqueue_at: Instant,
interval: Duration,
) -> Result<(), SendError>;
}
Expand All @@ -65,17 +65,17 @@ impl<M> RecipientExt<M> for Recipient<TimedMessage<M>> {
self.send(TimedMessage::Instant { message })
}

fn send_timed(&self, message: M, fire_at: Instant) -> Result<(), SendError> {
self.send(TimedMessage::Delayed { message, fire_at })
fn send_timed(&self, message: M, enqueue_at: Instant) -> Result<(), SendError> {
self.send(TimedMessage::Delayed { message, enqueue_at })
}

fn send_recurring(
&self,
factory: impl FnMut() -> M + Send + 'static,
fire_at: Instant,
enqueue_at: Instant,
interval: Duration,
) -> Result<(), SendError> {
self.send(TimedMessage::Recurring { factory: Box::new(factory), fire_at, interval })
self.send(TimedMessage::Recurring { factory: Box::new(factory), enqueue_at, interval })
}
}

Expand Down Expand Up @@ -111,50 +111,52 @@ pub struct Timed<A: Actor> {
queue: BinaryHeap<QueueItem<A::Message>>,
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A> {
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Timed<A>
where
<A as Actor>::Error: From<SendError>,
{
pub fn new(inner: A) -> Self {
Self { inner, queue: Default::default() }
}

/// Process any pending messages in the internal queue, calling wrapped actor's `handle()`.
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), A::Error> {
// Handle all messages that should have been handled by now.
let now = Instant::now();
while self.queue.peek().map(|m| m.fire_at <= now).unwrap_or(false) {
fn process_queue(&mut self, context: &mut <Self as Actor>::Context) -> Result<(), SendError> {
// If the message on top of the queue is due, send it to the regular actor queue.
// No problem if there are multiple such messages, it's handle() will call process_queue()
// again.
strohel marked this conversation as resolved.
Show resolved Hide resolved
if self.queue.peek().map(|m| m.enqueue_at <= Instant::now()).unwrap_or(false) {
let item = self.queue.pop().expect("heap is non-empty, we have just peeked");

let message = match item.payload {
Payload::Delayed { message } => message,
Payload::Recurring { mut factory, interval } => {
let message = factory();
self.queue.push(QueueItem {
fire_at: item.fire_at + interval,
enqueue_at: item.enqueue_at + interval,
payload: Payload::Recurring { factory, interval },
});
message
},
};

// Let inner actor do its job.
//
// Alternatively, we could send an `Instant` message to ourselves.
// - The advantage would be that it would go into the queue with proper priority. But it
// is unclear what should be handled first: normal-priority message that should have
// been processed a while ago, or a high-priority message that was delivered now.
// - Disadvantage is we could easily overflow the queue if many messages fire at once.
self.inner.handle(&mut TimedContext::from_context(context), message)?;
// Enqueue an immediate message to process. Alternative would be to call inner handle(),
// but we don't want to effectively call child handle() twice in the parent handle().
context.myself.send_now(message)?;
}

Ok(())
}

fn schedule_timeout(&self, context: &mut <Self as Actor>::Context) {
// Schedule next timeout if the queue is not empty.
context.set_deadline(self.queue.peek().map(|earliest| earliest.fire_at));
context.set_deadline(self.queue.peek().map(|earliest| earliest.enqueue_at));
}
}

impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A> {
impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor for Timed<A>
where
<A as Actor>::Error: From<SendError>,
{
type Context = Context<Self::Message>;
type Error = A::Error;
type Message = TimedMessage<M>;
Expand All @@ -171,12 +173,14 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor
TimedMessage::Instant { message } => {
self.inner.handle(&mut TimedContext::from_context(context), message)?;
},
TimedMessage::Delayed { message, fire_at } => {
self.queue.push(QueueItem { fire_at, payload: Payload::Delayed { message } });
TimedMessage::Delayed { message, enqueue_at } => {
self.queue.push(QueueItem { enqueue_at, payload: Payload::Delayed { message } });
},
TimedMessage::Recurring { factory, fire_at, interval } => {
self.queue
.push(QueueItem { fire_at, payload: Payload::Recurring { factory, interval } });
TimedMessage::Recurring { factory, enqueue_at, interval } => {
self.queue.push(QueueItem {
enqueue_at,
payload: Payload::Recurring { factory, interval },
});
},
};

Expand All @@ -195,14 +199,11 @@ impl<M: Send + 'static, A: Actor<Context = TimedContext<M>, Message = M>> Actor

fn priority(message: &Self::Message) -> Priority {
match message {
// Use underlying message priority if we can reference it.
TimedMessage::Instant { message } | TimedMessage::Delayed { message, .. } => {
A::priority(message)
},
// Recurring message is only received once, the recurring instances go through the
// internal queue (and not actor's channel). Assign high priority to the request to
// set-up the recurrent sending.
TimedMessage::Recurring { .. } => Priority::High,
// Use underlying message priority for instant messages.
TimedMessage::Instant { message } => A::priority(message),
// Recurring and Delayed messages are only added to the queue when handled, and then go
// through actors priority inboxes again when actually enqueued.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm having trouble visualizing the message flow based on this comment.

only added to the queue when handled

In my mind, "handled" means the actor has run the handle() function on the message, so it's already out of the queue.

Is it possible to be more specific in the language here about the flow of the message from when send_delayed is called to when it actually ends up in the actor handle() function?

Copy link
Member

@goodhoko goodhoko Jan 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. What about

Suggested change
// Recurring and Delayed messages are only added to the queue when handled, and then go
// through actors priority inboxes again when actually enqueued.
// These priorities apply to the set-up of Delayed and Recurring messages and we want to handle that pronto.
// The resulting inner message then comes back as `Instant` and is prioritized per its internal priority.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goodhoko you suggestion reads very good to me using it (with very small tweaks)

TimedMessage::Recurring { .. } | TimedMessage::Delayed { .. } => Priority::High,
}
}

Expand Down Expand Up @@ -236,13 +237,13 @@ impl<A: Actor> Deref for Timed<A> {

/// Implementation detail, element of message queue ordered by time to fire at.
struct QueueItem<M> {
fire_at: Instant,
enqueue_at: Instant,
payload: Payload<M>,
}

impl<M> PartialEq for QueueItem<M> {
fn eq(&self, other: &Self) -> bool {
self.fire_at == other.fire_at
self.enqueue_at == other.enqueue_at
}
}

Expand All @@ -257,8 +258,8 @@ impl<M> PartialOrd for QueueItem<M> {

impl<M> Ord for QueueItem<M> {
fn cmp(&self, other: &Self) -> Ordering {
// Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `fire_at`.
self.fire_at.cmp(&other.fire_at).reverse()
// Reverse because [BinaryHeap] is a *max* heap, but we want pop() to return lowest `enqueue_at`.
self.enqueue_at.cmp(&other.enqueue_at).reverse()
}
}

Expand All @@ -283,14 +284,14 @@ mod tests {

impl Actor for TimedTestActor {
type Context = TimedContext<Self::Message>;
type Error = ();
type Error = SendError;
type Message = usize;

fn name() -> &'static str {
"TimedTestActor"
}

fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), ()> {
fn handle(&mut self, context: &mut Self::Context, message: usize) -> Result<(), SendError> {
{
let mut guard = self.received.lock().unwrap();
guard.push(message);
Expand Down Expand Up @@ -338,14 +339,12 @@ mod tests {

// The timeline (order of messages received) is:
// at 0 ms: 1 (initial message, takes 100 ms to handle),
// at 100 ms: 2 (first recurring scheduled message, delivered 50 ms late),
// at 100 ms: 3 (first self-sent message, 100 ms to handle),
// at 200 ms: 2 (second recurring message, 50 ms late, in the same ^^^ handle() invocation)
// at 200 ms: 2 (first recurring scheduled message, delivered 150 ms late),
// at 200 ms: 3 (second self-sent message, 100 ms to handle)
// at 225 ms: (control message to shut down the actor sent)
// at 300 ms: 2 (third recurring message, 50 ms late, in the same ^^^ handle() invocation)
// at 300 ms: (control signal to shut down finally delivered to the actor)
assert_eq!(*received.lock().unwrap(), vec![1, 2, 3, 2, 3, 2]);
assert_eq!(*received.lock().unwrap(), vec![1, 3, 2, 3]);
}

/// Test that actors with recurring messages that take longer to handle than what the recurring
Expand All @@ -369,15 +368,13 @@ mod tests {
system.shutdown().unwrap();

// The timeline (order of messages received) is:
// at 0 ms: 2 (deadline_passed() handles the first recurring message, takes 100 ms)
// at 100 ms: 2 ten times (deadline_passed() gradually handles 10 recurring messages the should have
// fired by the time it started, takes 1 full second)
// at 150 ms: (message "4" enqueued from the main thread)
// at 275 ms: (control message to shut down the actor sent)
// at 1100 ms: (actor loop finally kicks in again, gets control message, shuts down)
//
// Notice the message "4" is never received even though the actor had 125 ms to handle it
// (more time than needed to handle one recurring message). That's issue #79.
assert_eq!(*received.lock().unwrap(), vec![2; 11]);
// at 0 ms: 2 (first recurring message, 100 ms to handle)
// at 100 ms: 2 (second recurring message, 90 ms late, 100 ms to handle)
// at 150 ms: (message "4" sent to the actor from the main thread)
// at 200 ms: 4 (actor wakes up, processes message 4 that was sent before the recurring one)
// at 200 ms: 2 (third recurring message, 180 ms late, 100 ms to handle)
// at 275 ms: (control message to shut down actor sent)
// at 300 ms: (control message to shut down received at highest priority)
assert_eq!(*received.lock().unwrap(), vec![2, 2, 4, 2]);
}
}
Loading