From 15e2f86662ae4ab7b6f080bb3b1ed445fe94e0c3 Mon Sep 17 00:00:00 2001 From: Max Countryman Date: Sat, 26 Oct 2024 09:00:11 -0700 Subject: [PATCH] simplify scheduler somewhat (#35) Here we iterate the retrieved schedule directly, waiting until the next enqueue or we receive a shutdown signal, whichever happens first. If the schedule ends, then the scheduler will return and no more processing will occur until a new schedule is set and a new scheduler is running. In the future we may want to support updating schedules with a trigger a la the worker. --- src/scheduler.rs | 41 +++++++++++------------------------------ 1 file changed, 11 insertions(+), 30 deletions(-) diff --git a/src/scheduler.rs b/src/scheduler.rs index d55d882..c6faaca 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,6 +1,6 @@ use std::{result::Result as StdResult, str::FromStr, sync::Arc, time::Duration as StdDuration}; -use jiff::{tz::TimeZone, Span, ToSpan, Zoned}; +use jiff::{tz::TimeZone, Zoned}; use jiff_cron::Schedule; use sqlx::postgres::{PgAdvisoryLock, PgListener}; use tokio_util::sync::CancellationToken; @@ -63,21 +63,16 @@ impl Scheduler { } } - /// Runs the scheduler in a loop, sleeping one-second per iteration. + /// Loops over the configured schedule, enqueuing tasks as the duration + /// arrives. pub async fn run(&self) -> Result { - self.run_every(1.second()).await - } - - /// Runs the scheduler in a loop, sleeping for the given period per - /// iteration. - pub async fn run_every(&self, period: Span) -> Result { let conn = self.queue.pool.acquire().await?; let Some(_guard) = try_acquire_advisory_lock(conn, &self.queue_lock).await? else { tracing::debug!("Scheduler could not acquire lock, exiting"); return Ok(()); }; - let Some((mut zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await? + let Some((zoned_schedule, input)) = self.queue.task_schedule(&self.queue.pool).await? else { // No schedule configured, so we'll exit. return Ok(()); @@ -87,15 +82,15 @@ impl Scheduler { let mut shutdown_listener = PgListener::connect_with(&self.queue.pool).await?; shutdown_listener.listen(SHUTDOWN_CHANNEL).await?; - let mut polling_interval = tokio::time::interval(period.try_into()?); - loop { + // TODO: Handle updates to schedules? + + for until_next in zoned_schedule.into_iter() { tokio::select! { notify_shutdown = shutdown_listener.recv() => { match notify_shutdown { Ok(_) => { self.shutdown_token.cancel(); }, - Err(err) => { tracing::error!(%err, "Postgres shutdown notification error"); } @@ -106,8 +101,9 @@ impl Scheduler { break } - _ = polling_interval.tick() => { - self.trigger_schedule_processing(&mut zoned_schedule, &input).await? + _ = tokio::time::sleep(until_next) => { + tracing::debug!(?until_next, "Sleeping until next scheduled task enqueue"); + self.process_next_schedule(&input).await? } } } @@ -115,18 +111,6 @@ impl Scheduler { Ok(()) } - async fn trigger_schedule_processing( - &self, - zoned_schedule: &mut ZonedSchedule, - input: &T::Input, - ) -> Result { - if let Some(until_next) = zoned_schedule.into_iter().next() { - self.process_next_schedule(until_next, input).await?; - } - - Ok(()) - } - #[instrument( skip_all, fields( @@ -136,10 +120,7 @@ impl Scheduler { ), err )] - async fn process_next_schedule(&self, until_next: StdDuration, input: &T::Input) -> Result { - tracing::debug!(?until_next, "Sleeping until the next scheduled enqueue"); - tokio::time::sleep(until_next).await; - + async fn process_next_schedule(&self, input: &T::Input) -> Result { let task_id = self .queue .enqueue(&self.queue.pool, &self.task, input)