From e5f19b2a17302041d1be246fe57d174e6223c11b Mon Sep 17 00:00:00 2001 From: Sean Lawlor Date: Wed, 23 Aug 2023 13:43:38 -0400 Subject: [PATCH] Adding cron job example --- ractor_actors/Cargo.toml | 4 +- ractor_actors/examples/cron.rs | 113 +++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 ractor_actors/examples/cron.rs diff --git a/ractor_actors/Cargo.toml b/ractor_actors/Cargo.toml index 9d1ed3d..7a983cd 100644 --- a/ractor_actors/Cargo.toml +++ b/ractor_actors/Cargo.toml @@ -31,4 +31,6 @@ notify = { version = "5", optional = true } tokio-rustls = { version = "0.23", optional = true } [dev-dependencies] -tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] } \ No newline at end of file +tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread", "signal"] } +tracing-glog = "0.2" +tracing-subscriber = { version = "0.3", features = ["env-filter"]} \ No newline at end of file diff --git a/ractor_actors/examples/cron.rs b/ractor_actors/examples/cron.rs new file mode 100644 index 0000000..984456d --- /dev/null +++ b/ractor_actors/examples/cron.rs @@ -0,0 +1,113 @@ +// Copyright (c) Sean Lawlor +// +// This source code is licensed under both the MIT license found in the +// LICENSE-MIT file in the root directory of this source tree. + +//! A basic counting agent. Demonstrates remote procedure calls to interact +//! with the agent externally and safely acquire the "count" +//! +//! Execute with +//! +//! ```text +//! cargo run --example cron +//! ``` + +extern crate ractor_actors; +use std::str::FromStr; + +use cron::Schedule; +use ractor::{ + async_trait, + concurrency::{sleep, Duration, Instant}, + Actor, ActorProcessingErr, +}; +use ractor_actors::time::cron::{CronManager, CronManagerMessage, CronSettings, Job}; + +struct MyJob { + last: Option, +} + +#[async_trait] +impl Job for MyJob { + fn id<'a>(&self) -> &'a str { + "my_job" + } + + async fn work(&mut self) -> Result<(), ActorProcessingErr> { + let now = Instant::now(); + let delta = self.last.map(|ts| (now - ts).as_millis()); + + sleep(Duration::from_millis(500)).await; + + tracing::info!("Working hard for {:?} ms", delta); + + self.last = Some(now); + + Ok(()) + } +} + +fn init_logging() { + let dir = tracing_subscriber::filter::Directive::from(tracing::Level::DEBUG); + + use std::io::stderr; + use std::io::IsTerminal; + use tracing_glog::Glog; + use tracing_glog::GlogFields; + use tracing_subscriber::filter::EnvFilter; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::Registry; + + let fmt = tracing_subscriber::fmt::Layer::default() + .with_ansi(stderr().is_terminal()) + .with_writer(std::io::stderr) + .event_format(Glog::default().with_timer(tracing_glog::LocalTime::default())) + .fmt_fields(GlogFields); + + let filter = vec![dir] + .into_iter() + .fold(EnvFilter::from_default_env(), |filter, directive| { + filter.add_directive(directive) + }); + + let subscriber = Registry::default().with(filter).with(fmt); + tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber"); +} + +#[tokio::main] +async fn main() { + init_logging(); + + // every 5s + let schedule = "*/5 * * * * * *"; + let schedule = Schedule::from_str(schedule).expect("Failed to parse schedule"); + + let (manager, handle) = Actor::spawn(None, CronManager, ()) + .await + .expect("Failed to start cron manager"); + + manager + .call( + |prt| { + CronManagerMessage::Start( + CronSettings { + job: Box::new(MyJob { last: None }), + schedule, + }, + prt, + ) + }, + None, + ) + .await + .expect("Failed to contact cron manager") + .expect("Failed to send rpc reply") + .expect("Failed to start cron job"); + + // cleanup + tokio::signal::ctrl_c() + .await + .expect("Failed to wait for ctrl-c"); + manager.stop(None); + handle.await.unwrap(); +}