Skip to content

Commit

Permalink
Adding cron job example
Browse files Browse the repository at this point in the history
  • Loading branch information
slawlor committed Aug 24, 2023
1 parent 1816e6b commit e5f19b2
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 1 deletion.
4 changes: 3 additions & 1 deletion ractor_actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,6 @@ notify = { version = "5", optional = true }
tokio-rustls = { version = "0.23", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread"] }
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread", "signal"] }
tracing-glog = "0.2"
tracing-subscriber = { version = "0.3", features = ["env-filter"]}
113 changes: 113 additions & 0 deletions ractor_actors/examples/cron.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! A basic counting agent. Demonstrates remote procedure calls to interact
//! with the agent externally and safely acquire the "count"
//!
//! Execute with
//!
//! ```text
//! cargo run --example cron
//! ```

extern crate ractor_actors;
use std::str::FromStr;

use cron::Schedule;
use ractor::{
async_trait,
concurrency::{sleep, Duration, Instant},
Actor, ActorProcessingErr,
};
use ractor_actors::time::cron::{CronManager, CronManagerMessage, CronSettings, Job};

struct MyJob {
last: Option<Instant>,
}

#[async_trait]
impl Job for MyJob {
fn id<'a>(&self) -> &'a str {
"my_job"
}

async fn work(&mut self) -> Result<(), ActorProcessingErr> {
let now = Instant::now();
let delta = self.last.map(|ts| (now - ts).as_millis());

sleep(Duration::from_millis(500)).await;

tracing::info!("Working hard for {:?} ms", delta);

self.last = Some(now);

Ok(())
}
}

fn init_logging() {
let dir = tracing_subscriber::filter::Directive::from(tracing::Level::DEBUG);

use std::io::stderr;
use std::io::IsTerminal;
use tracing_glog::Glog;
use tracing_glog::GlogFields;
use tracing_subscriber::filter::EnvFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Registry;

let fmt = tracing_subscriber::fmt::Layer::default()
.with_ansi(stderr().is_terminal())
.with_writer(std::io::stderr)
.event_format(Glog::default().with_timer(tracing_glog::LocalTime::default()))
.fmt_fields(GlogFields);

let filter = vec![dir]
.into_iter()
.fold(EnvFilter::from_default_env(), |filter, directive| {
filter.add_directive(directive)
});

let subscriber = Registry::default().with(filter).with(fmt);
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
async fn main() {
init_logging();

// every 5s
let schedule = "*/5 * * * * * *";
let schedule = Schedule::from_str(schedule).expect("Failed to parse schedule");

let (manager, handle) = Actor::spawn(None, CronManager, ())
.await
.expect("Failed to start cron manager");

manager
.call(
|prt| {
CronManagerMessage::Start(
CronSettings {
job: Box::new(MyJob { last: None }),
schedule,
},
prt,
)
},
None,
)
.await
.expect("Failed to contact cron manager")
.expect("Failed to send rpc reply")
.expect("Failed to start cron job");

// cleanup
tokio::signal::ctrl_c()
.await
.expect("Failed to wait for ctrl-c");
manager.stop(None);
handle.await.unwrap();
}

0 comments on commit e5f19b2

Please sign in to comment.