Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

run both scheduler and worker for jobs #12

Merged
merged 2 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions examples/graceful_shutdown/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let every_second = "* * * * * *[America/Los_Angeles]".parse()?;
job.schedule(every_second, ()).await?;

// Run the scheduler and shutdown signal listener in a separate Tokio task.
tokio::spawn({
let job = job.clone();
async move { tokio::join!(job.run_scheduler(), shutdown_signal(&pool)) }
});
// Await the shutdown signal handler in its own task.
tokio::spawn(async move { shutdown_signal(&pool).await });

// The worker will run until the queue signals a shutdown.
// The job will run until the queue signals a shutdown.
job.run().await?;

Ok(())
Expand Down
3 changes: 1 addition & 2 deletions examples/scheduled/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let every_minute = "0 * * * * *[America/Los_Angeles]".parse()?;
job.schedule(every_minute, ()).await?;

// Run the scheduler and worker concurrently.
let _ = tokio::join!(job.run_scheduler(), job.run());
job.run().await?;

Ok(())
}
61 changes: 51 additions & 10 deletions src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ use sqlx::PgExecutor;

use crate::{
queue::{Error as QueueError, Queue},
scheduler::{Result as SchedulerResult, Scheduler, ZonedSchedule},
scheduler::{Error as SchedulerError, Result as SchedulerResult, Scheduler, ZonedSchedule},
task::{Error as TaskError, Id as TaskId, Result as TaskResult, RetryPolicy, Task},
worker::{Result as WorkerResult, Worker},
worker::{Error as WorkerError, Result as WorkerResult, Worker},
};

type JobInput<I, S> = <Job<I, S> as Task>::Input;
Expand Down Expand Up @@ -406,17 +406,29 @@ pub enum Error {
#[error(transparent)]
Task(#[from] TaskError),

/// Error returned from worker operation.
#[error(transparent)]
Worker(#[from] WorkerError),

/// Error returned from scheduler operation.
#[error(transparent)]
Scheduler(#[from] SchedulerError),

/// Error returned from Tokio task joins.
#[error(transparent)]
Join(#[from] tokio::task::JoinError),

/// Error returned from database operations.
#[error(transparent)]
Database(#[from] sqlx::Error),
}

/// An ergnomic implementation of the `Task` trait.
/// Ergnomic implementation of the `Task` trait.
#[derive(Clone)]
pub struct Job<I, S = ()>
where
Self: Task,
I: Clone,
I: Clone + DeserializeOwned + Serialize + Send + 'static,
S: Clone + Send + Sync + 'static,
{
pub(crate) queue: Queue<Self>,
Expand Down Expand Up @@ -549,7 +561,7 @@ where
}

/// Constructs a worker which then immediately runs task processing.
pub async fn run(&self) -> WorkerResult {
pub async fn run_worker(&self) -> WorkerResult {
let worker = Worker::from(self);
worker.run().await
}
Expand All @@ -559,6 +571,33 @@ where
let scheduler = Scheduler::from(self);
scheduler.run().await
}

/// Runs both a worker and scheduler for the job.
pub async fn run(&self) -> Result {
let worker = Worker::from(self);
let scheduler = Scheduler::from(self);

let worker_task = tokio::spawn(async move { worker.run().await });
let scheduler_task = tokio::spawn(async move { scheduler.run().await });

tokio::select! {
res = worker_task => {
match res {
Ok(inner_res) => inner_res?,
Err(join_err) => return Err(Error::from(join_err)),
}
},

res = scheduler_task => {
match res {
Ok(inner_res) => inner_res?,
Err(join_err) => return Err(Error::from(join_err)),
}
},
}

Ok(())
}
}

mod builder_states {
Expand Down Expand Up @@ -1083,6 +1122,7 @@ mod tests {
use sqlx::PgPool;

use super::*;
use crate::queue::graceful_shutdown;

#[sqlx::test]
async fn create_job(pool: PgPool) -> sqlx::Result<(), Error> {
Expand Down Expand Up @@ -1261,20 +1301,21 @@ mod tests {
*data = "bar".to_string();
Ok(())
})
.queue(queue.clone())
.queue(queue)
.build();

job.enqueue(()).await?;

let job_handle = tokio::spawn(async move { job.run().await });
tokio::spawn(async move { job.run().await });

// Wait for job to complete
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(500)).await;

assert_eq!(*state.data.lock().unwrap(), "bar".to_string());

// Ensure the test will exit
job_handle.abort();
// Shutdown and wait for a bit to ensure the test can exit.
tokio::spawn(async move { graceful_shutdown(&pool).await });
tokio::time::sleep(std::time::Duration::from_millis(10)).await;

Ok(())
}
Expand Down