Skip to content

Commit

Permalink
bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
saxix committed Nov 28, 2024
1 parent 30c4dce commit 25708dd
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 115 deletions.
8 changes: 8 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
0.5
---
* rename `celery_result` as `local_status`
* add `datetime_queued`, `datetime_created`, `repeatable`, `owner`, `group_key`
* add signals
* add property verbose_status


0.4.1
---
* fixes deps version
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "django-celery-boost"
version = "0.4.1"
version = "0.5.0"
description = "Django Abstract Model to work with Celery"
readme = "README.md"
requires-python = ">=3.10"
Expand Down
2 changes: 1 addition & 1 deletion src/django_celery_boost/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = __version__ = "0.4.1"
VERSION = __version__ = "0.5.0"
20 changes: 13 additions & 7 deletions src/django_celery_boost/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ def celery_inspect(self, request: HttpRequest, pk: str) -> HttpResponse:
ctx = self.get_common_context(request, pk, title="Inspect Task")
return render(
request,
self.inspect_template or [
self.inspect_template
or [
"%s/%s/%s/inspect.html" % (self.admin_site.name, self.opts.app_label, self.opts.model_name),
"%s/%s/inspect.html" % (self.admin_site.name, self.opts.app_label),
"%s/celery_boost/inspect.html" % self.admin_site.name,
Expand Down Expand Up @@ -85,10 +86,11 @@ def doit(request: "HttpRequest") -> HttpResponseRedirect:
current_app=self.admin_site.name,
)
return HttpResponseRedirect(redirect_url)

if obj.is_queued():
self.message_user(request, "Task has already been queued.", messages.WARNING)
return
if obj.is_terminated():
if obj.is_terminated() and not obj.repeatable:
self.message_user(request, "Task is already terminated.", messages.WARNING)
return

Expand All @@ -100,7 +102,8 @@ def doit(request: "HttpRequest") -> HttpResponseRedirect:
"Queued",
extra_context=ctx,
description="",
template=self.queue_template or [
template=self.queue_template
or [
"%s/%s/%s/queue.html" % (self.admin_site.name, self.opts.app_label, self.opts.model_name),
"%s/%s/queue.html" % (self.admin_site.name, self.opts.app_label),
"%s/celery_boost/queue.html" % self.admin_site.name,
Expand Down Expand Up @@ -154,23 +157,26 @@ def _celery_terminate(self, request: HttpRequest, pk: str) -> "HttpResponse": #
ctx = self.get_common_context(request, pk, title=f"Confirm termination request for {obj}")

def doit(request: "HttpRequest") -> HttpResponseRedirect:
obj.terminate()
result = obj.terminate()
redirect_url = reverse(
"%a:%s_%s_change" % (self.admin_site.name, obj._meta.app_label, obj._meta.model_name),
"%s:%s_%s_change" % (self.admin_site.name, obj._meta.app_label, obj._meta.model_name),
args=(obj.pk,),
current_app=self.admin_site.name,
)
self.message_user(request, str(result), messages.SUCCESS)

return HttpResponseRedirect(redirect_url)

return confirm_action(
self,
request,
doit,
"Do you really want to terminate this task?",
"Terminated",
None,
extra_context=ctx,
description="",
template=self.terminate_template or [
template=self.terminate_template
or [
"%s/%s/%s/terminate.html" % (self.admin_site.name, self.opts.app_label, self.opts.model_name),
"%s/%s/terminate.html" % (self.admin_site.name, self.opts.app_label),
"%s/celery_boost/terminate.html" % self.admin_site.name,
Expand Down
55 changes: 41 additions & 14 deletions src/django_celery_boost/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
from django.conf import settings
from django.core import checks
from django.db import models
from django.utils import timezone
from django.utils.functional import classproperty
from django.utils.module_loading import import_string
from django.utils.translation import gettext as _

from django_celery_boost.signals import task_queued, task_revoked, task_terminated

if TYPE_CHECKING:
import celery.app.control
from kombu.connection import Connection
Expand All @@ -28,12 +31,6 @@ class CeleryManager:


class CeleryTaskModel(models.Model):
"""
Attributes:
flight_speed The maximum speed that such a bird can attain.
nesting_grounds The locale where these birds congregate to reproduce.
"""

class Meta:
abstract = True
Expand Down Expand Up @@ -75,9 +72,26 @@ class Meta:
editable=False,
help_text="Latest executed AsyncResult is",
)
celery_history = models.JSONField(default=dict, blank=True, null=False, editable=False)
celery_result = models.CharField(max_length=100, default="", blank=True, null=True, editable=False)
datetime_created = models.DateTimeField(auto_now_add=True, help_text="Creation date and time")
datetime_queued = models.DateTimeField("Queued At", blank=True, null=True, help_text="Queueing date and time")
repeatable = models.BooleanField(default=False, blank=True, help_text="Indicate if the job can be repeated as-is")

celery_history = models.JSONField(default=dict, blank=True, null=False, editable=False)
local_status = models.CharField(max_length=100, default="", blank=True, null=True, editable=False)
owner = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="%(app_label)s_%(class)s_jobs",
on_delete=models.CASCADE,
null=True,
blank=True,
)
group_key = models.CharField(
max_length=255,
blank=True,
editable=False,
null=True,
help_text="Tasks with the same group key will not run in parallel",
)
celery_task_name: str = ""
"FQN of the task processing this Model's instances"

Expand Down Expand Up @@ -269,6 +283,17 @@ def is_terminated(self) -> bool:
"""Check if the job is queued"""
return self.task_status and self.task_status in self.TERMINATED_STATUSES

def log_task_action(self, action, user):
self.celery_history[str(timezone.now())] = f"{action} by {user}"
self.save(update_fields=["celery_history"])

@property
def verbose_status(self) -> str:
status = self.task_status
if self.local_status:
return f"{status} ({self.local_status})"
return status

@property
def task_status(self) -> str:
"""Returns the task status querying Celery API"""
Expand Down Expand Up @@ -296,15 +321,16 @@ def queue(self, use_version: bool = True) -> str | None:
res = self.task_handler.delay(self.pk, self.version if use_version else None)
with concurrency_disable_increment(self):
self.curr_async_result_id = res.id
self.save(update_fields=["curr_async_result_id"])
self.datetime_queued = timezone.now()
self.save(update_fields=["curr_async_result_id", "datetime_queued"])
task_queued.send(sender=self.__class__, task=self)
return self.curr_async_result_id
return None

def revoke(self, wait=False, timeout=None) -> None:
if self.async_result:
return self.async_result.revoke(terminate=False, signal="SIGTERM", wait=wait, timeout=timeout)
else:
return None
self.async_result.revoke(terminate=False, signal="SIGTERM", wait=wait, timeout=timeout)
task_revoked.send(sender=self.__class__, task=self)

def terminate(self, wait=False, timeout=None) -> str:
"""Revoke the task. Does not need Running workers"""
Expand Down Expand Up @@ -334,8 +360,9 @@ def terminate(self, wait=False, timeout=None) -> str:
self.curr_async_result_id = None
st = self.UNKNOWN

self.celery_result = st
self.save(update_fields=["celery_result", "curr_async_result_id"])
self.local_status = st
self.save(update_fields=["local_status", "curr_async_result_id"])
task_terminated.send(sender=self.__class__, task=self)
return st

@classmethod
Expand Down
10 changes: 10 additions & 0 deletions src/django_celery_boost/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from django.dispatch import Signal

# actions
task_queued = Signal()
task_revoked = Signal()
task_terminated = Signal()

# events
task_complete = Signal()
task_start = Signal()
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
{% block object-tools %}
{{ block.super }}
<div>
{{ original.task_status }}
{{ original.verbose_status }}
</div>
{% endblock %}
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def pytest_configure(config):

settings.AUTHENTICATION_BACKENDS = ["django.contrib.auth.backends.ModelBackend"]
settings.CELERY_TASK_ALWAYS_EAGER = False
settings.CELERY_TASK_STORE_EAGER_RESULT = True
settings.CELERY_BROKER_URL = os.environ.get("CELERY_BROKER_URL")
settings.DEMOAPP_PATH = DEMOAPP_PATH
settings.MESSAGE_STORAGE = "demo.messages.PlainCookieStorage"
Expand Down
6 changes: 6 additions & 0 deletions tests/demoapp/demo/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class GroupFactory(DjangoModelFactory):
class Meta:
model = Group
django_get_or_create = ("name",)
skip_postgeneration_save = True

@factory.post_generation # type: ignore[misc]
def permissions(self, create: bool, extracted: list[str], **kwargs: Any) -> None:
Expand All @@ -41,6 +42,11 @@ def permissions(self, create: bool, extracted: list[str], **kwargs: Any) -> None
app, perm = perm_name.split(".")
self.permissions.add(Permission.objects.get(content_type__app_label=app, codename=perm))

@classmethod
def _after_postgeneration(cls, instance, create, results=None):
if create and results and not cls._meta.skip_postgeneration_save:
instance.save()


class user_grant_permission:
def __init__(self, user: User, permissions: Optional[list[str]] = None):
Expand Down
86 changes: 0 additions & 86 deletions tests/demoapp/demo/migrations/0001_initial.py

This file was deleted.

6 changes: 2 additions & 4 deletions tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,13 +98,11 @@ def test_celery_terminate(request, django_app, std_user, job):
res = django_app.get(url, user=std_user).follow()
msgs = res.context["messages"]
assert [m.message for m in msgs] == ["Task not queued."]

with mock.patch.object(Job, "is_queued") as m:
m.return_value = True
with mock.patch.object(Job, "is_queued", lambda s: True):
res = django_app.get(url, user=std_user)
res = res.forms[1].submit().follow()
msgs = res.context["messages"]
assert [m.message for m in msgs] == ["Terminated"]
assert [m.message for m in msgs] == ["UNKNOWN"]


def test_celery_revoke(request, django_app, std_user, job):
Expand Down
3 changes: 2 additions & 1 deletion tests/test_eager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

def test_celery_task_info_processed(settings):
settings.CELERY_TASK_ALWAYS_EAGER = True
settings.CELERY_TASK_STORE_EAGER_RESULT = False
job1: Job = JobFactory()
assert job1.queue() == job1.curr_async_result_id
assert job1.task_status == Job.MISSING
assert job1.task_status == Job.SUCCESS

0 comments on commit 25708dd

Please sign in to comment.