Skip to content

Commit

Permalink
simplify scheduler somewhat (#35)
Browse files Browse the repository at this point in the history
Here we iterate the retrieved schedule directly, waiting until the next
enqueue or we receive a shutdown signal, whichever happens first.

If the schedule ends, then the scheduler will return and no more
processing will occur until a new schedule is set and a new scheduler is
running.

In the future we may want to support updating schedules with a trigger a
la the worker.
  • Loading branch information
maxcountryman authored Oct 26, 2024
1 parent 6d07fb4 commit 15e2f86
Showing 1 changed file with 11 additions and 30 deletions.
41 changes: 11 additions & 30 deletions src/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration as StdDuration};

use jiff::{tz::TimeZone, Span, ToSpan, Zoned};
use jiff::{tz::TimeZone, Zoned};
use jiff_cron::Schedule;
use sqlx::postgres::{PgAdvisoryLock, PgListener};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -63,21 +63,16 @@ impl<T: Task> Scheduler<T> {
}
}

/// Runs the scheduler in a loop, sleeping one-second per iteration.
/// Loops over the configured schedule, enqueuing tasks as the duration
/// arrives.
pub async fn run(&self) -> Result {
self.run_every(1.second()).await
}

/// Runs the scheduler in a loop, sleeping for the given period per
/// iteration.
pub async fn run_every(&self, period: Span) -> Result {
let conn = self.queue.pool.acquire().await?;
let Some(_guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else {
tracing::debug!("Scheduler could not acquire lock, exiting");
return Ok(());
};

let Some((mut zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await?
let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await?
else {
// No schedule configured, so we'll exit.
return Ok(());
Expand All @@ -87,15 +82,15 @@ impl<T: Task> Scheduler<T> {
let mut shutdown_listener = PgListener::connect_with(&self.queue.pool).await?;
shutdown_listener.listen(SHUTDOWN_CHANNEL).await?;

let mut polling_interval = tokio::time::interval(period.try_into()?);
loop {
// TODO: Handle updates to schedules?

for until_next in zoned_schedule.into_iter() {
tokio::select! {
notify_shutdown = shutdown_listener.recv() => {
match notify_shutdown {
Ok(_) => {
self.shutdown_token.cancel();
},

Err(err) => {
tracing::error!(%err, "Postgres shutdown notification error");
}
Expand All @@ -106,27 +101,16 @@ impl<T: Task> Scheduler<T> {
break
}

_ = polling_interval.tick() => {
self.trigger_schedule_processing(&mut zoned_schedule, &input).await?
_ = tokio::time::sleep(until_next) => {
tracing::debug!(?until_next, "Sleeping until next scheduled task enqueue");
self.process_next_schedule(&input).await?
}
}
}

Ok(())
}

async fn trigger_schedule_processing(
&self,
zoned_schedule: &mut ZonedSchedule,
input: &T::Input,
) -> Result {
if let Some(until_next) = zoned_schedule.into_iter().next() {
self.process_next_schedule(until_next, input).await?;
}

Ok(())
}

#[instrument(
skip_all,
fields(
Expand All @@ -136,10 +120,7 @@ impl<T: Task> Scheduler<T> {
),
err
)]
async fn process_next_schedule(&self, until_next: StdDuration, input: &T::Input) -> Result {
tracing::debug!(?until_next, "Sleeping until the next scheduled enqueue");
tokio::time::sleep(until_next).await;

async fn process_next_schedule(&self, input: &T::Input) -> Result {
let task_id = self
.queue
.enqueue(&self.queue.pool, &self.task, input)
Expand Down

0 comments on commit 15e2f86

Please sign in to comment.