diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 62f3802..d5d3067 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -61,7 +61,7 @@ jobs: strategy: max-parallel: 1 matrix: - python-version: [ "3.11", "3.12" ] + python-version: [ "3.11", "3.12", "3.13" ] django-version: [ "4.2", "5.1" ] fail-fast: true needs: [ changes ] diff --git a/pyproject.toml b/pyproject.toml index d4a63c6..1abad83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,6 +9,7 @@ dependencies = [ "django-admin-extra-buttons>=1.5.8", "django-concurrency>=2.6", "django>=4", + "sentry-sdk", ] [project.optional-dependencies] diff --git a/pytest.ini b/pytest.ini index 7ad064e..d5a90ce 100644 --- a/pytest.ini +++ b/pytest.ini @@ -19,11 +19,12 @@ addopts = -rs --tb=short --capture=sys + --maxfail=5 --echo-version django --cov-config=tests/.coveragerc --cov-report html --cov-report xml:coverage.xml - + --cov=django_celery_boost markers = admin diff --git a/src/django_celery_boost/models.py b/src/django_celery_boost/models.py index c8979d3..f26b869 100644 --- a/src/django_celery_boost/models.py +++ b/src/django_celery_boost/models.py @@ -3,6 +3,7 @@ import logging from typing import TYPE_CHECKING, Any, Callable, Generator, Optional +import sentry_sdk from celery import states from celery.app.base import Celery from celery.result import AsyncResult @@ -134,7 +135,7 @@ def celery_app(cls) -> "celery.app.base.Celery": return cls._celery_app @property - def celery_task_name(self): + def celery_task_name(self): # pragma: no cover return self.default_celery_task_name @classmethod @@ -419,3 +420,53 @@ def purge(cls: "type[CeleryTaskModel]") -> None: with cls.celery_app.pool.acquire(block=True) as conn: conn.default_channel.client.delete(cls.celery_task_queue) conn.default_channel.client.delete(cls.celery_task_revoked_queue) + + +class AsyncJobModel(CeleryTaskModel): + class JobType(models.TextChoices): + STANDARD_TASK = "STANDARD_TASK", "Standard Task" + JOB_TASK = "JOB_TASK", "Job Task" + + type = models.CharField(max_length=50, choices=JobType.choices) + config = models.JSONField(default=dict, blank=True) + action = models.CharField(max_length=500, blank=True, null=True) + description = models.CharField(max_length=255, blank=True, null=True) + sentry_id = models.CharField(max_length=255, blank=True, null=True) + + class Meta: + abstract = True + permissions = (("debug_job", "Can debug background jobs"),) + + def __str__(self): + return self.description or f"Background Job #{self.pk}" + + @property + def queue_position(self) -> int: + try: + return super().queue_position + except Exception: + return 0 + + @property + def started(self) -> str: + try: + return self.task_info["started_at"] + except Exception: + return "=" + + def execute(self): + sid = None + try: + func = import_string(self.action) + match self.type: + case AsyncJobModel.JobType.STANDARD_TASK: + return func(**self.config) + case AsyncJobModel.JobType.JOB_TASK: + return func(self) + except Exception as e: + sid = sentry_sdk.capture_exception(e) + raise e + finally: + if sid: + self.sentry_id = sid + self.save(update_fields=["sentry_id"]) diff --git a/tests/demoapp/demo/factories.py b/tests/demoapp/demo/factories.py index 553f533..f693c84 100644 --- a/tests/demoapp/demo/factories.py +++ b/tests/demoapp/demo/factories.py @@ -1,7 +1,7 @@ from typing import Any, Optional import factory -from demo.models import Job +from demo.models import Job, MultipleJob from django.contrib.auth.models import Group, Permission, User from factory.django import DjangoModelFactory from factory.faker import Faker @@ -18,6 +18,14 @@ class Meta: model = Job +class AsyncJobModelFactory(DjangoModelFactory): + curr_async_result_id = None + last_async_result_id = None + + class Meta: + model = MultipleJob + + class UserFactory(DjangoModelFactory): class Meta: model = User diff --git a/tests/demoapp/demo/migrations/0001_initial.py b/tests/demoapp/demo/migrations/0001_initial.py index 799c13c..f336d86 100644 --- a/tests/demoapp/demo/migrations/0001_initial.py +++ b/tests/demoapp/demo/migrations/0001_initial.py @@ -1,4 +1,4 @@ -# Generated by Django 5.1.3 on 2024-11-28 14:38 +# Generated by Django 5.1.1 on 2024-12-17 16:42 import concurrency.fields import django.db.models.deletion @@ -253,4 +253,122 @@ class Migration(migrations.Migration): ), }, ), + migrations.CreateModel( + name="MultipleJob", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "version", + concurrency.fields.AutoIncVersionField( + default=0, help_text="record revision number" + ), + ), + ( + "curr_async_result_id", + models.CharField( + blank=True, + editable=False, + help_text="Current (active) AsyncResult is", + max_length=36, + null=True, + ), + ), + ( + "last_async_result_id", + models.CharField( + blank=True, + editable=False, + help_text="Latest executed AsyncResult is", + max_length=36, + null=True, + ), + ), + ( + "datetime_created", + models.DateTimeField( + auto_now_add=True, help_text="Creation date and time" + ), + ), + ( + "datetime_queued", + models.DateTimeField( + blank=True, + help_text="Queueing date and time", + null=True, + verbose_name="Queued At", + ), + ), + ( + "repeatable", + models.BooleanField( + blank=True, + default=False, + help_text="Indicate if the job can be repeated as-is", + ), + ), + ( + "celery_history", + models.JSONField(blank=True, default=dict, editable=False), + ), + ( + "local_status", + models.CharField( + blank=True, + default="", + editable=False, + max_length=100, + null=True, + ), + ), + ( + "group_key", + models.CharField( + blank=True, + editable=False, + help_text="Tasks with the same group key will not run in parallel", + max_length=255, + null=True, + ), + ), + ( + "type", + models.CharField( + choices=[ + ("STANDARD_TASK", "Standard Task"), + ("JOB_TASK", "Job Task"), + ], + max_length=50, + ), + ), + ("config", models.JSONField(blank=True, default=dict)), + ("action", models.CharField(blank=True, max_length=500, null=True)), + ( + "description", + models.CharField(blank=True, max_length=255, null=True), + ), + ("sentry_id", models.CharField(blank=True, max_length=255, null=True)), + ( + "owner", + models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="%(app_label)s_%(class)s_jobs", + to=settings.AUTH_USER_MODEL, + ), + ), + ], + options={ + "permissions": (("test_multiplejob", "Can test MultipleJob"),), + "abstract": False, + }, + ), ] diff --git a/tests/demoapp/demo/models.py b/tests/demoapp/demo/models.py index d7e2348..01726b9 100644 --- a/tests/demoapp/demo/models.py +++ b/tests/demoapp/demo/models.py @@ -1,6 +1,6 @@ from django.db import models -from django_celery_boost.models import CeleryTaskModel +from django_celery_boost.models import AsyncJobModel, CeleryTaskModel class Job(CeleryTaskModel, models.Model): @@ -31,3 +31,10 @@ class Meta(CeleryTaskModel.Meta): permissions = (("test_alternativejob", "Can test AlternativeJob"),) celery_task_name = "demo.tasks.process_job" + + +class MultipleJob(AsyncJobModel): + class Meta(AsyncJobModel.Meta): + permissions = (("test_multiplejob", "Can test MultipleJob"),) + + celery_task_name = "demo.tasks.process_job" diff --git a/tests/demoapp/demo/tasks.py b/tests/demoapp/demo/tasks.py index 36168a9..b104760 100644 --- a/tests/demoapp/demo/tasks.py +++ b/tests/demoapp/demo/tasks.py @@ -2,6 +2,7 @@ from celery import shared_task from concurrency.exceptions import RecordModifiedError +from django.core.cache import cache @shared_task(bind=True) @@ -40,5 +41,11 @@ def process_job(self, pk, version=None): @shared_task() -def echo(value): +def echo(value="echo"): + return value + + +@shared_task() +def cache_store(key, value): + cache.set(key, value) return value diff --git a/tests/test_celery.py b/tests/test_celery.py index 5811502..b6b9893 100644 --- a/tests/test_celery.py +++ b/tests/test_celery.py @@ -1,9 +1,15 @@ import os from time import sleep +from django.contrib.auth.models import Group import pytest -from demo.factories import JobFactory -from demo.models import Job +from demo.factories import JobFactory, GroupFactory +from demo.models import Job, MultipleJob + +from django_celery_boost.models import AsyncJobModel +from django.core.cache import cache +from tests.demoapp.demo.factories import AsyncJobModelFactory +from unittest.mock import patch pytest_plugins = ("celery.contrib.pytest",) @@ -181,3 +187,27 @@ def test_revoke(transactional_db, celery_app, celery_worker, reset_queue): job1.queue() job1.revoke() assert job1.task_status == Job.MISSING + + +def test_async_job_standard(transactional_db, celery_app, celery_worker, reset_queue): + async_job: MultipleJob = AsyncJobModelFactory( + type=AsyncJobModel.JobType.STANDARD_TASK, + config={"key": "key", "value": "value"}, + action="demo.tasks.cache_store", + ) + + assert cache.get("key") is None + async_job.execute() + assert cache.get("key") == "value" + + +@patch("demo.tasks.echo") +def test_async_job_task( + mocked_value, transactional_db, celery_app, celery_worker, reset_queue +): + async_job: MultipleJob = AsyncJobModelFactory( + type=AsyncJobModel.JobType.JOB_TASK, action="demo.tasks.echo" + ) + + async_job.execute() + assert mocked_value.call_count == 1 diff --git a/tests/test_model.py b/tests/test_model.py index 48db413..219a34a 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -26,6 +26,7 @@ def test_model_initialize_new(db): "revoked": 0, "size": 0, } + assert job.celery_task_name == "demo.tasks.process_job" def test_model_queue(db): @@ -67,6 +68,8 @@ def test_model_disallow_multiple_queue(db): def test_model_get_celery_queue_position(db): job1: Job = JobFactory() + assert job1.queue_position == 0 + job1.queue() assert job1.queue_position == 1 @@ -161,6 +164,7 @@ def test_terminate(db): m.return_value = Job.PROGRESS assert job1.terminate() == job1.REVOKED assert not job1.is_queued() + assert job1.queue_position == 0 job1.queue() with mock.patch("demo.models.Job.task_status", new_callable=PropertyMock) as m: diff --git a/tests/test_permissions.py b/tests/test_permissions.py index f8b3107..84e2993 100644 --- a/tests/test_permissions.py +++ b/tests/test_permissions.py @@ -12,3 +12,6 @@ def test_permissions_created(db): assert Permission.objects.filter( content_type__app_label="demo", codename="queue_alternativejob" ).exists() + assert Permission.objects.filter( + content_type__app_label="demo", codename="test_multiplejob" + ).exists() diff --git a/tox.ini b/tox.ini index 4599b17..f6c68b1 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = d{42,51}-py{311,312} +envlist = d{42,51}-py{311,312,313} skip_missing_interpreters = true [testenv]