Skip to content

Commit

Permalink
Adding cron subscription notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
slawlor committed Aug 25, 2023
1 parent e5f19b2 commit 9981727
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 4 deletions.
2 changes: 1 addition & 1 deletion ractor_actors/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ractor_actors"
version = "0.1.0"
version = "0.1.1"
authors = ["Sean Lawlor"]
description = "Helpful actors built with Ractor"
documentation = "https://docs.rs/ractor_actors"
Expand Down
147 changes: 144 additions & 3 deletions ractor_actors/src/time/cron/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
use std::collections::HashMap;

use cron::Schedule;
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort, State, SupervisionEvent};
use ractor::{Actor, ActorId, ActorProcessingErr, ActorRef, RpcReplyPort, State, SupervisionEvent};

mod worker;
use worker::{Cron, CronMessage};
Expand All @@ -87,6 +87,19 @@ pub trait Job: State {
async fn work(&mut self) -> Result<(), ActorProcessingErr>;
}

/// Represents a dynamic subscription to [CronManager] events which
/// denote cron job startup/failure/stoppage
pub trait CronEventSubscriber: State {
/// A job started
fn started(&self, job: String);

/// A job was stopped, with the optional stop reason
fn stopped(&self, job: String, reason: Option<String>);

/// A job failed, including the failure reason
fn failed(&self, job: String, reason: String);
}

/// The settings for a singular cron job
pub struct CronSettings {
/// This cron job's schedule
Expand All @@ -103,6 +116,7 @@ pub struct CronManager;
/// jobs to schedule
pub struct CronManagerState {
jobs: HashMap<String, (Schedule, ActorRef<CronMessage>)>,
subs: HashMap<ActorId, Box<dyn CronEventSubscriber>>,
}

/// Messages that the [CronManager] actor supports
Expand All @@ -117,6 +131,10 @@ pub enum CronManagerMessage {
Start(CronSettings, RpcReplyPort<Result<(), ActorProcessingErr>>),
/// Stop a specific cron job
Stop(String),
/// Subscribe to cron job events
Subscribe(ActorId, Box<dyn CronEventSubscriber>),
/// Unsubscriber from cron job events
Unsubscribe(ActorId),
}

#[ractor::async_trait]
Expand All @@ -132,6 +150,7 @@ impl Actor for CronManager {
) -> Result<Self::State, ActorProcessingErr> {
Ok(CronManagerState {
jobs: HashMap::new(),
subs: HashMap::new(),
})
}

Expand Down Expand Up @@ -177,6 +196,9 @@ impl Actor for CronManager {
CronManagerMessage::Stop(who) => {
if let Some(actor) = state.jobs.remove(&who) {
actor.1.stop(None);
for (_, sub) in state.subs.iter() {
sub.stopped(who.clone(), None);
}
}
}
CronManagerMessage::SetSchedule(who, schedule) => {
Expand All @@ -200,6 +222,12 @@ impl Actor for CronManager {
let _ = reply.send(None);
}
}
CronManagerMessage::Subscribe(who, processor) => {
state.subs.insert(who, processor);
}
CronManagerMessage::Unsubscribe(who) => {
state.subs.remove(&who);
}
}
Ok(())
}
Expand All @@ -219,20 +247,38 @@ impl Actor for CronManager {
.map(|(id, _)| id.clone());
if let Some(name) = job {
tracing::error!("Cron job {name} panicked with error {what}.");
for (_, sub) in state.subs.iter() {
sub.failed(name.clone(), what.to_string());
}
state.jobs.remove(&name);
}
}
SupervisionEvent::ActorTerminated(who, _, _) => {
SupervisionEvent::ActorTerminated(who, _, what) => {
// just cleanup if it's still hanging around
let job = state
.jobs
.iter()
.find(|(_, v)| v.1.get_id() == who.get_id())
.map(|(id, _)| id.clone());
if let Some(name) = job {
for (_, sub) in state.subs.iter() {
sub.stopped(name.clone(), what.clone());
}
state.jobs.remove(&name);
}
}
SupervisionEvent::ActorStarted(who) => {
let job = state
.jobs
.iter()
.find(|(_, v)| v.1.get_id() == who.get_id())
.map(|(id, _)| id.clone());
if let Some(name) = job {
for (_, sub) in state.subs.iter() {
sub.started(name.clone());
}
}
}
_ => {
// ignore all other supervision events (spawn, etc)
}
Expand Down Expand Up @@ -371,10 +417,105 @@ mod tests {
// job failed on first execution so it should be removed now
let result = ractor::call_t!(manager, CronManagerMessage::ListJobs, 100)
.expect("Failed to query jobs list");
assert!(!result.contains_key("counter_job"));
assert!(!result.contains_key("bad_job"));

// Cleanup
manager.stop(None);
mhandle.await.unwrap();
}

#[ractor::concurrency::test]
async fn test_cron_event_subscription() {
// Setup
let schedule = " */1 * * * * * *";
let schedule = Schedule::from_str(schedule).expect("Failed to parse schedule");
let start_counter = Arc::new(AtomicU16::new(0));
let stop_counter = Arc::new(AtomicU16::new(0));
let fail_counter = Arc::new(AtomicU16::new(0));
let counter = Arc::new(AtomicU16::new(0));

struct Subscriber {
starts: Arc<AtomicU16>,
stops: Arc<AtomicU16>,
fails: Arc<AtomicU16>,
}

impl CronEventSubscriber for Subscriber {
fn started(&self, _: String) {
self.starts.fetch_add(1, Ordering::Relaxed);
}
fn stopped(&self, _: String, _: Option<String>) {
self.stops.fetch_add(1, Ordering::Relaxed);
}
fn failed(&self, _: String, _: String) {
self.fails.fetch_add(1, Ordering::Relaxed);
}
}

let (manager, mhandle) = Actor::spawn(None, CronManager, ())
.await
.expect("Failed to spawn cron manager");

// Act & Verify
manager
.cast(CronManagerMessage::Subscribe(
ActorId::Local(123),
Box::new(Subscriber {
fails: fail_counter.clone(),
starts: start_counter.clone(),
stops: stop_counter.clone(),
}),
))
.expect("Failed to send message to manager");

manager
.call(
|prt| {
CronManagerMessage::Start(
CronSettings {
schedule: schedule.clone(),
job: Box::new(CounterJob { counter }),
},
prt,
)
},
None,
)
.await
.expect("Failed to send start message")
.expect("Cron send timed out")
.expect("Failed to start cron job with error");

manager
.call(
|prt| {
CronManagerMessage::Start(
CronSettings {
schedule,
job: Box::new(BadJob),
},
prt,
)
},
Some(Duration::from_millis(100)),
)
.await
.expect("Failed to send start message")
.expect("Cron send timed out")
.expect("Failed to start cron job with error");

sleep(Duration::from_secs(2)).await;
manager
.cast(CronManagerMessage::Stop("counter_job".to_string()))
.expect("Failed to send stop command");
sleep(Duration::from_millis(500)).await;
// cleanup cron manager
manager.stop(None);
mhandle.await.unwrap();

// Assert
assert_eq!(2, start_counter.load(Ordering::Relaxed));
assert_eq!(1, fail_counter.load(Ordering::Relaxed));
assert_eq!(1, stop_counter.load(Ordering::Relaxed));
}
}

0 comments on commit 9981727

Please sign in to comment.