From 03efa9c9c0a3cb6fe6d8dc81b8a431280e8615f4 Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Thu, 17 Oct 2024 11:07:50 -0700 Subject: [PATCH 1/2] run both scheduler and worker for jobs This changes the way the run method works on jobs: now both the scheduler and the worker will be spawned off into separate tasks and selected over. Additionally a new run_worker method is provided as a convenience, which is the complement to run_scheduler. By doing so, in-process task execution is simplified when using scheduled jobs. --- examples/graceful_shutdown/src/main.rs | 9 ++-- examples/scheduled/src/main.rs | 3 +- src/job.rs | 59 ++++++++++++++++++++++---- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/examples/graceful_shutdown/src/main.rs b/examples/graceful_shutdown/src/main.rs index 2b9cc56..9e24cf2 100644 --- a/examples/graceful_shutdown/src/main.rs +++ b/examples/graceful_shutdown/src/main.rs @@ -75,13 +75,10 @@ async fn main() -> Result<(), Box> { let every_second = "* * * * * *[America/Los_Angeles]".parse()?; job.schedule(every_second, ()).await?; - // Run the scheduler and shutdown signal listener in a separate Tokio task. - tokio::spawn({ - let job = job.clone(); - async move { tokio::join!(job.run_scheduler(), shutdown_signal(&pool)) } - }); + // Await the shutdown signal handler in its own task. + tokio::spawn(async move { shutdown_signal(&pool).await }); - // The worker will run until the queue signals a shutdown. + // The job will run until the queue signals a shutdown. job.run().await?; Ok(()) diff --git a/examples/scheduled/src/main.rs b/examples/scheduled/src/main.rs index ababaeb..01106c0 100644 --- a/examples/scheduled/src/main.rs +++ b/examples/scheduled/src/main.rs @@ -30,8 +30,7 @@ async fn main() -> Result<(), Box> { let every_minute = "0 * * * * *[America/Los_Angeles]".parse()?; job.schedule(every_minute, ()).await?; - // Run the scheduler and worker concurrently. - let _ = tokio::join!(job.run_scheduler(), job.run()); + job.run().await?; Ok(()) } diff --git a/src/job.rs b/src/job.rs index 8d37a0b..af56d38 100644 --- a/src/job.rs +++ b/src/job.rs @@ -374,9 +374,9 @@ use sqlx::PgExecutor; use crate::{ queue::{Error as QueueError, Queue}, - scheduler::{Result as SchedulerResult, Scheduler, ZonedSchedule}, + scheduler::{Error as SchedulerError, Result as SchedulerResult, Scheduler, ZonedSchedule}, task::{Error as TaskError, Id as TaskId, Result as TaskResult, RetryPolicy, Task}, - worker::{Result as WorkerResult, Worker}, + worker::{Error as WorkerError, Result as WorkerResult, Worker}, }; type JobInput = as Task>::Input; @@ -406,17 +406,29 @@ pub enum Error { #[error(transparent)] Task(#[from] TaskError), + /// Error returned from worker operation. + #[error(transparent)] + Worker(#[from] WorkerError), + + /// Error returned from scheduler operation. + #[error(transparent)] + Scheduler(#[from] SchedulerError), + + /// Error returned from Tokio task joins. + #[error(transparent)] + Join(#[from] tokio::task::JoinError), + /// Error returned from database operations. #[error(transparent)] Database(#[from] sqlx::Error), } -/// An ergnomic implementation of the `Task` trait. +/// Ergnomic implementation of the `Task` trait. #[derive(Clone)] pub struct Job where Self: Task, - I: Clone, + I: Clone + DeserializeOwned + Serialize + Send + 'static, S: Clone + Send + Sync + 'static, { pub(crate) queue: Queue, @@ -549,7 +561,7 @@ where } /// Constructs a worker which then immediately runs task processing. - pub async fn run(&self) -> WorkerResult { + pub async fn run_worker(&self) -> WorkerResult { let worker = Worker::from(self); worker.run().await } @@ -559,6 +571,33 @@ where let scheduler = Scheduler::from(self); scheduler.run().await } + + /// Runs both a worker and scheduler for the job. + pub async fn run(&self) -> Result { + let worker = Worker::from(self); + let scheduler = Scheduler::from(self); + + let worker_task = tokio::spawn(async move { worker.run().await }); + let scheduler_task = tokio::spawn(async move { scheduler.run().await }); + + tokio::select! { + res = worker_task => { + match res { + Ok(inner_res) => inner_res?, + Err(join_err) => return Err(Error::from(join_err)), + } + }, + + res = scheduler_task => { + match res { + Ok(inner_res) => inner_res?, + Err(join_err) => return Err(Error::from(join_err)), + } + }, + } + + Ok(()) + } } mod builder_states { @@ -1083,6 +1122,7 @@ mod tests { use sqlx::PgPool; use super::*; + use crate::queue::graceful_shutdown; #[sqlx::test] async fn create_job(pool: PgPool) -> sqlx::Result<(), Error> { @@ -1261,20 +1301,21 @@ mod tests { *data = "bar".to_string(); Ok(()) }) - .queue(queue.clone()) + .queue(queue) .build(); job.enqueue(()).await?; - let job_handle = tokio::spawn(async move { job.run().await }); + tokio::spawn(async move { job.run().await }); // Wait for job to complete tokio::time::sleep(std::time::Duration::from_millis(100)).await; assert_eq!(*state.data.lock().unwrap(), "bar".to_string()); - // Ensure the test will exit - job_handle.abort(); + // Shutdown and wait for a bit to ensure the test can exit. + tokio::spawn(async move { graceful_shutdown(&pool).await }); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; Ok(()) } From ed97421443e76f7ad4aa509792998d436e28945f Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Thu, 17 Oct 2024 11:16:42 -0700 Subject: [PATCH 2/2] sleep a bit longer --- src/job.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/job.rs b/src/job.rs index af56d38..4988905 100644 --- a/src/job.rs +++ b/src/job.rs @@ -1309,7 +1309,7 @@ mod tests { tokio::spawn(async move { job.run().await }); // Wait for job to complete - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + tokio::time::sleep(std::time::Duration::from_millis(500)).await; assert_eq!(*state.data.lock().unwrap(), "bar".to_string());