diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index c6b3861..0e3cb4c 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -15,7 +15,7 @@ from redis import WatchError -from .utils import from_unix, to_unix, get_next_scheduled_time, rationalize_until +from .utils import from_unix, to_unix, get_next_scheduled_time, get_next_rrule_scheduled_time, rationalize_until logger = logging.getLogger(__name__) @@ -298,6 +298,36 @@ def cron(self, cron_string, func, args=None, kwargs=None, repeat=None, {job.id: to_unix(scheduled_time)}) return job + + def rrule(self, rrule_string, func, args=None, kwargs=None, repeat=None, + queue_name=None, result_ttl=-1, ttl=None, id=None, timeout=None, description=None, meta=None, use_local_timezone=False, + depends_on=None, on_success=None, on_failure=None, at_front: bool = False): + """ + Schedule a recurring job via RRule + """ + scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone) + + job = self._create_job(func, args=args, kwargs=kwargs, commit=False, + result_ttl=result_ttl, ttl=ttl, id=id, queue_name=queue_name, + description=description, timeout=timeout, meta=meta, depends_on=depends_on, + on_success=on_success, on_failure=on_failure) + + job.meta['rrule_string'] = rrule_string + job.meta['use_local_timezone'] = use_local_timezone + + if repeat is not None: + job.meta['repeat'] = int(repeat) + + if at_front: + job.enqueue_at_front = True + + job.save() + + self.connection.zadd(self.scheduled_jobs_key, + {job.id: to_unix(scheduled_time)}) + return job + + def cancel(self, job): """ Pulls a job from the scheduler queue. This function accepts either a @@ -415,6 +445,7 @@ def enqueue_job(self, job): interval = job.meta.get('interval', None) repeat = job.meta.get('repeat', None) cron_string = job.meta.get('cron_string', None) + rrule_string = job.meta.get('rrule_string', None) use_local_timezone = job.meta.get('use_local_timezone', None) # If job is a repeated job, decrement counter @@ -425,21 +456,21 @@ def enqueue_job(self, job): queue.enqueue_job(job, at_front=bool(job.enqueue_at_front)) self.connection.zrem(self.scheduled_jobs_key, job.id) + # If this is a repeat job and counter has reached 0, don't repeat + if repeat is not None: + if job.meta['repeat'] == 0: + return if interval: - # If this is a repeat job and counter has reached 0, don't repeat - if repeat is not None: - if job.meta['repeat'] == 0: - return self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(datetime.utcnow()) + int(interval)}) elif cron_string: - # If this is a repeat job and counter has reached 0, don't repeat - if repeat is not None: - if job.meta['repeat'] == 0: - return next_scheduled_time = get_next_scheduled_time(cron_string, use_local_timezone=use_local_timezone) self.connection.zadd(self.scheduled_jobs_key, {job.id: to_unix(next_scheduled_time)}) + elif rrule_string: + next_scheduled_time = get_next_rrule_scheduled_time(rrule_string, use_local_timezone=use_local_timezone) + self.connection.zadd(self.scheduled_jobs_key, + {job.id: to_unix(next_scheduled_time)}) def enqueue_jobs(self): """ diff --git a/rq_scheduler/utils.py b/rq_scheduler/utils.py index c34ab02..1fe23a2 100644 --- a/rq_scheduler/utils.py +++ b/rq_scheduler/utils.py @@ -1,6 +1,8 @@ import calendar import crontab import dateutil.tz +import dateutil.rrule +import dateutil.tz from datetime import datetime, timedelta import logging @@ -30,6 +32,16 @@ def get_next_scheduled_time(cron_string, use_local_timezone=False): return next_time.astimezone(tz) +def get_next_rrule_scheduled_time(rrule_string, use_local_timezone=False): + """Calculate the next scheduled time by creating a rrule object + with a rrule string""" + now = datetime.now() + rrule = dateutil.rrule.rrulestr(rrule_string) + next_time = rrule.after(now) + tz = dateutil.tz.tzlocal() if use_local_timezone else dateutil.tz.UTC + return next_time.astimezone(tz) + + def setup_loghandlers(level='INFO'): logger = logging.getLogger('rq_scheduler.scheduler') if not logger.handlers: diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index e0ee785..9a34ebb 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -14,7 +14,7 @@ from rq_scheduler import Scheduler from rq_scheduler.utils import from_unix -from rq_scheduler.utils import get_next_scheduled_time +from rq_scheduler.utils import get_next_scheduled_time, get_next_rrule_scheduled_time from rq_scheduler.utils import to_unix from tests import RQTestCase @@ -874,3 +874,171 @@ def test_create_job_with_queue_class_name(self): job = self.scheduler._create_job(say_hello) job_from_queue = Job.fetch(job.id, connection=self.testconn) self.assertFalse(job_from_queue.meta.get("queue_class_name")) + + def test_rrule_persisted_correctly(self): + """ + Ensure that rrule attribute gets correctly saved in Redis. + """ + # create a job that runs one minute past each whole hour + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job_from_queue.meta['rrule_string'], "RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0") + + # get the scheduled_time and convert it to a datetime object + unix_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id) + datetime_time = from_unix(unix_time) + + # check that minute=1, seconds=0, and is within an hour + assert datetime_time.minute == 1 + assert datetime_time.second == 0 + assert datetime_time - datetime.utcnow() <= timedelta(hours=1), f"{datetime_time - datetime.utcnow()} is greater than 1 hour" + + def test_rrule_persisted_correctly_with_local_timezone(self): + """ + Ensure that rrule attribute gets correctly saved in Redis when using local TZ. + """ + # create a job that runs each day at 15:00 + job = self.scheduler.rrule("RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=0;BYSECOND=0", say_hello, use_local_timezone=True) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job_from_queue.meta['rrule_string'], "RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=0;BYSECOND=0") + + # get the scheduled_time and convert it to a datetime object + unix_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id) + datetime_time = from_unix(unix_time) + + expected_datetime_in_local_tz = datetime.now(tzlocal()).replace(hour=15,minute=0,second=0,microsecond=0) + assert datetime_time.time() == expected_datetime_in_local_tz.astimezone(UTC).time() + + def test_rrule_rescheduled_correctly_with_local_timezone(self): + # Create a job that runs each day at 15:01 + job = self.scheduler.rrule("RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=1;BYSECOND=0", say_hello, use_local_timezone=True) + + # Change this job to run each day at 15:02 + job.meta['rrule_string'] = "RRULE:FREQ=DAILY;WKST=MO;BYHOUR=15;BYMINUTE=2;BYSECOND=0" + + # reenqueue the job + self.scheduler.enqueue_job(job) + + # get the scheduled_time and convert it to a datetime object + unix_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id) + datetime_time = from_unix(unix_time) + + expected_datetime_in_local_tz = datetime.now(tzlocal()).replace(hour=15,minute=2,second=0,microsecond=0) + assert datetime_time.time() == expected_datetime_in_local_tz.astimezone(UTC).time() + + def test_rrule_schedules_correctly(self): + # Create a job with a rrulejob_string + now = datetime.now().replace(minute=0, hour=0, second=0, microsecond=0) + with freezegun.freeze_time(now): + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=5;BYSECOND=0", say_hello) + + with mock.patch.object(self.scheduler, 'enqueue_job', wraps=self.scheduler.enqueue_job) as enqueue_job, \ + freezegun.freeze_time(now + timedelta(minutes=5)): + self.assertEqual(1, self.scheduler.count()) + self.scheduler.enqueue_jobs() + self.assertEqual(1, enqueue_job.call_count) + + (job, next_scheduled_time), = self.scheduler.get_jobs(with_times=True) + expected_scheduled_time = (now + timedelta(hours=1, minutes=5)).astimezone(UTC) + self.assertEqual(to_unix(expected_scheduled_time), to_unix(next_scheduled_time), f"{next_scheduled_time} should be {expected_scheduled_time}") + + def test_rrule_sets_timeout(self): + """ + Ensure that a job scheduled via rrule can be created with + a custom timeout. + """ + timeout = 13 + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, timeout=timeout) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job_from_queue.timeout, timeout) + + def test_rrule_sets_id(self): + """ + Ensure that a job scheduled via rrule can be created with + a custom id + """ + job_id = "hello-job-id" + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, id=job_id) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(job_id, job_from_queue.id) + + def test_rrule_sets_default_result_ttl(self): + """ + Ensure that a job scheduled via rrule gets proper default + result_ttl (-1) periodic tasks. + """ + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(-1, job_from_queue.result_ttl) + + def test_rrule_sets_description(self): + """ + Ensure that a job scheduled via rrule can be created with + a custom description + """ + description = 'test description' + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, description=description) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(description, job_from_queue.description) + + def test_rrule_sets_default_result_ttl_to_minus_1(self): + """ + Ensure that a job scheduled via rrule sets the default result_ttl to -1 + """ + result_ttl = -1 + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(result_ttl, job_from_queue.result_ttl) + + def test_rrule_sets_provided_result_ttl(self): + """ + Ensure that a job scheduled via rrule can be created with + a custom result_ttl + """ + result_ttl = 123 + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, result_ttl=result_ttl) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(result_ttl, job_from_queue.result_ttl) + + def test_rrule_sets_default_ttl_to_none(self): + """ + Ensure that a job scheduled via rrule sets the default result_ttl to -1 + """ + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertIsNone(job_from_queue.ttl) + + def test_rrule_sets_provided_ttl(self): + """ + Ensure that a job scheduled via rrule can be created with + a custom result_ttl + """ + ttl = 123 + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello, ttl=ttl) + job_from_queue = Job.fetch(job.id, connection=self.testconn) + self.assertEqual(ttl, job_from_queue.ttl) + + def test_job_with_rrule_get_rescheduled(self): + # Create a job with a rrule_string + job = self.scheduler.rrule("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=1;BYSECOND=0", say_hello) + + # current unix_time + old_next_scheduled_time = self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id) + + # change rrule_string + job.meta['rrule_string'] = "RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=2;BYSECOND=0" + + # enqueue the job + self.scheduler.enqueue_job(job) + + self.assertIn(job.id, + tl(self.testconn.zrange(self.scheduler.scheduled_jobs_key, 0, 1))) + + # check that next scheduled time has changed + self.assertNotEqual(old_next_scheduled_time, + self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id)) + + # check that new next scheduled time is set correctly + expected_next_scheduled_time = to_unix(get_next_rrule_scheduled_time("RRULE:FREQ=HOURLY;WKST=MO;BYMINUTE=2;BYSECOND=0")) + self.assertEqual(self.testconn.zscore(self.scheduler.scheduled_jobs_key, job.id), + expected_next_scheduled_time)