Skip to content

Commit

Permalink
add abstract model
Browse files Browse the repository at this point in the history
  • Loading branch information
domdinicola committed Jan 7, 2025
1 parent 8558e46 commit 7b0c988
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 10 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ jobs:
strategy:
max-parallel: 1
matrix:
python-version: [ "3.11", "3.12" ]
python-version: [ "3.11", "3.12", "3.13" ]
django-version: [ "4.2", "5.1" ]
fail-fast: true
needs: [ changes ]
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies = [
"django-admin-extra-buttons>=1.5.8",
"django-concurrency>=2.6",
"django>=4",
"sentry-sdk",
]

[project.optional-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ addopts =
-rs
--tb=short
--capture=sys
--maxfail=5
--echo-version django
--cov-config=tests/.coveragerc
--cov-report html
--cov-report xml:coverage.xml

--cov=django_celery_boost

markers =
admin
Expand Down
54 changes: 53 additions & 1 deletion src/django_celery_boost/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import logging
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional

import sentry_sdk
from celery import states
from celery.app.base import Celery
from celery.result import AsyncResult
from concurrency.api import concurrency_disable_increment
from concurrency.fields import AutoIncVersionField
from django.apps import apps
from django.conf import settings
from django.core import checks
from django.db import models
Expand Down Expand Up @@ -134,7 +136,7 @@ def celery_app(cls) -> "celery.app.base.Celery":
return cls._celery_app

@property
def celery_task_name(self):
def celery_task_name(self): # pragma: no cover
return self.default_celery_task_name

@classmethod
Expand Down Expand Up @@ -419,3 +421,53 @@ def purge(cls: "type[CeleryTaskModel]") -> None:
with cls.celery_app.pool.acquire(block=True) as conn:
conn.default_channel.client.delete(cls.celery_task_queue)
conn.default_channel.client.delete(cls.celery_task_revoked_queue)


class AsyncJobModel(CeleryTaskModel):
class JobType(models.TextChoices):
STANDARD_TASK = "STANDARD_TASK", "Standard Task"
JOB_TASK = "JOB_TASK", "Job Task"

type = models.CharField(max_length=50, choices=JobType.choices)
config = models.JSONField(default=dict, blank=True)
action = models.CharField(max_length=500, blank=True, null=True)
description = models.CharField(max_length=255, blank=True, null=True)
sentry_id = models.CharField(max_length=255, blank=True, null=True)

class Meta:
abstract = True
permissions = (("debug_job", "Can debug background jobs"),)

def __str__(self):
return self.description or f"Background Job #{self.pk}"

@property
def queue_position(self) -> int:
try:
return super().queue_position
except Exception:
return 0

@property
def started(self) -> str:
try:
return self.task_info["started_at"]
except Exception:
return "="

def execute(self):
sid = None
try:
func = import_string(self.action)
match self.type:
case AsyncJobModel.JobType.STANDARD_TASK:
return func(**self.config)
case AsyncJobModel.JobType.JOB_TASK:
return func(self)
except Exception as e:
sid = sentry_sdk.capture_exception(e)
raise e
finally:
if sid:
self.sentry_id = sid
self.save(update_fields=["sentry_id"])
10 changes: 9 additions & 1 deletion tests/demoapp/demo/factories.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Any, Optional

import factory
from demo.models import Job
from demo.models import Job, MultipleJob
from django.contrib.auth.models import Group, Permission, User
from factory.django import DjangoModelFactory
from factory.faker import Faker
Expand All @@ -18,6 +18,14 @@ class Meta:
model = Job


class AsyncJobModelFactory(DjangoModelFactory):
curr_async_result_id = None
last_async_result_id = None

class Meta:
model = MultipleJob


class UserFactory(DjangoModelFactory):
class Meta:
model = User
Expand Down
120 changes: 119 additions & 1 deletion tests/demoapp/demo/migrations/0001_initial.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Generated by Django 5.1.3 on 2024-11-28 14:38
# Generated by Django 5.1.1 on 2024-12-17 16:42

import concurrency.fields
import django.db.models.deletion
Expand Down Expand Up @@ -253,4 +253,122 @@ class Migration(migrations.Migration):
),
},
),
migrations.CreateModel(
name="MultipleJob",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"version",
concurrency.fields.AutoIncVersionField(
default=0, help_text="record revision number"
),
),
(
"curr_async_result_id",
models.CharField(
blank=True,
editable=False,
help_text="Current (active) AsyncResult is",
max_length=36,
null=True,
),
),
(
"last_async_result_id",
models.CharField(
blank=True,
editable=False,
help_text="Latest executed AsyncResult is",
max_length=36,
null=True,
),
),
(
"datetime_created",
models.DateTimeField(
auto_now_add=True, help_text="Creation date and time"
),
),
(
"datetime_queued",
models.DateTimeField(
blank=True,
help_text="Queueing date and time",
null=True,
verbose_name="Queued At",
),
),
(
"repeatable",
models.BooleanField(
blank=True,
default=False,
help_text="Indicate if the job can be repeated as-is",
),
),
(
"celery_history",
models.JSONField(blank=True, default=dict, editable=False),
),
(
"local_status",
models.CharField(
blank=True,
default="",
editable=False,
max_length=100,
null=True,
),
),
(
"group_key",
models.CharField(
blank=True,
editable=False,
help_text="Tasks with the same group key will not run in parallel",
max_length=255,
null=True,
),
),
(
"type",
models.CharField(
choices=[
("STANDARD_TASK", "Standard Task"),
("JOB_TASK", "Job Task"),
],
max_length=50,
),
),
("config", models.JSONField(blank=True, default=dict)),
("action", models.CharField(blank=True, max_length=500, null=True)),
(
"description",
models.CharField(blank=True, max_length=255, null=True),
),
("sentry_id", models.CharField(blank=True, max_length=255, null=True)),
(
"owner",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
related_name="%(app_label)s_%(class)s_jobs",
to=settings.AUTH_USER_MODEL,
),
),
],
options={
"permissions": (("test_multiplejob", "Can test MultipleJob"),),
"abstract": False,
},
),
]
9 changes: 8 additions & 1 deletion tests/demoapp/demo/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from django.db import models

from django_celery_boost.models import CeleryTaskModel
from django_celery_boost.models import AsyncJobModel, CeleryTaskModel


class Job(CeleryTaskModel, models.Model):
Expand Down Expand Up @@ -31,3 +31,10 @@ class Meta(CeleryTaskModel.Meta):
permissions = (("test_alternativejob", "Can test AlternativeJob"),)

celery_task_name = "demo.tasks.process_job"


class MultipleJob(AsyncJobModel):
class Meta(AsyncJobModel.Meta):
permissions = (("test_multiplejob", "Can test MultipleJob"),)

celery_task_name = "demo.tasks.process_job"
9 changes: 8 additions & 1 deletion tests/demoapp/demo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from celery import shared_task
from concurrency.exceptions import RecordModifiedError
from django.core.cache import cache


@shared_task(bind=True)
Expand Down Expand Up @@ -40,5 +41,11 @@ def process_job(self, pk, version=None):


@shared_task()
def echo(value):
def echo(value="echo"):
return value


@shared_task()
def cache_store(key, value):
cache.set(key, value)
return value
34 changes: 32 additions & 2 deletions tests/test_celery.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import os
from time import sleep

from django.contrib.auth.models import Group
import pytest
from demo.factories import JobFactory
from demo.models import Job
from demo.factories import JobFactory, GroupFactory
from demo.models import Job, MultipleJob

from django_celery_boost.models import AsyncJobModel
from django.core.cache import cache
from tests.demoapp.demo.factories import AsyncJobModelFactory
from unittest.mock import patch

pytest_plugins = ("celery.contrib.pytest",)

Expand Down Expand Up @@ -181,3 +187,27 @@ def test_revoke(transactional_db, celery_app, celery_worker, reset_queue):
job1.queue()
job1.revoke()
assert job1.task_status == Job.MISSING


def test_async_job_standard(transactional_db, celery_app, celery_worker, reset_queue):
async_job: MultipleJob = AsyncJobModelFactory(
type=AsyncJobModel.JobType.STANDARD_TASK,
config={"key": "key", "value": "value"},
action="demo.tasks.cache_store",
)

assert cache.get("key") is None
async_job.execute()
assert cache.get("key") == "value"


@patch("demo.tasks.echo")
def test_async_job_task(
mocked_value, transactional_db, celery_app, celery_worker, reset_queue
):
async_job: MultipleJob = AsyncJobModelFactory(
type=AsyncJobModel.JobType.JOB_TASK, action="demo.tasks.echo"
)

async_job.execute()
assert mocked_value.call_count == 1
4 changes: 4 additions & 0 deletions tests/test_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def test_model_initialize_new(db):
"revoked": 0,
"size": 0,
}
assert job.celery_task_name == "demo.tasks.process_job"


def test_model_queue(db):
Expand Down Expand Up @@ -67,6 +68,8 @@ def test_model_disallow_multiple_queue(db):

def test_model_get_celery_queue_position(db):
job1: Job = JobFactory()
assert job1.queue_position == 0

job1.queue()
assert job1.queue_position == 1

Expand Down Expand Up @@ -161,6 +164,7 @@ def test_terminate(db):
m.return_value = Job.PROGRESS
assert job1.terminate() == job1.REVOKED
assert not job1.is_queued()
assert job1.queue_position == 0

job1.queue()
with mock.patch("demo.models.Job.task_status", new_callable=PropertyMock) as m:
Expand Down
3 changes: 3 additions & 0 deletions tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,6 @@ def test_permissions_created(db):
assert Permission.objects.filter(
content_type__app_label="demo", codename="queue_alternativejob"
).exists()
assert Permission.objects.filter(
content_type__app_label="demo", codename="test_multiplejob"
).exists()
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[tox]
envlist = d{42,51}-py{311,312}
envlist = d{42,51}-py{311,312,313}
skip_missing_interpreters = true

[testenv]
Expand Down

0 comments on commit 7b0c988

Please sign in to comment.