Skip to content

Commit

Permalink
fixes ModelAdmin.terminate reverse url
Browse files Browse the repository at this point in the history
  • Loading branch information
saxix committed Nov 27, 2024
1 parent 0d9cbbd commit 30c4dce
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/django_celery_boost/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def _celery_terminate(self, request: HttpRequest, pk: str) -> "HttpResponse": #
def doit(request: "HttpRequest") -> HttpResponseRedirect:
obj.terminate()
redirect_url = reverse(
"admin:%s_%s_change" % (obj._meta.app_label, obj._meta.model_name),
"%a:%s_%s_change" % (self.admin_site.name, obj._meta.app_label, obj._meta.model_name),
args=(obj.pk,),
current_app=self.admin_site.name,
)
Expand Down
22 changes: 15 additions & 7 deletions src/django_celery_boost/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import json
import logging
from typing import TYPE_CHECKING, Any, Callable, Generator, Optional

from celery import states
Expand All @@ -19,6 +20,8 @@
from kombu.connection import Connection
from kombu.transport.redis import Channel

logger = logging.getLogger(__name__)


class CeleryManager:
pass
Expand Down Expand Up @@ -251,12 +254,15 @@ def task_handler(cls: "type[CeleryTaskModel]") -> "Callable[[Any], Any]":

def is_queued(self) -> bool:
"""Check if the job is queued"""
with self.celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(self.celery_task_queue, 0, -1)
for task in tasks:
j = json.loads(task)
if j["headers"]["id"] == self.curr_async_result_id:
return True
try:
with self.celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(self.celery_task_queue, 0, -1)
for task in tasks:
j = json.loads(task)
if j["headers"]["id"] == self.curr_async_result_id:
return True
except Exception as e:
logger.exception(e)
return False

def is_terminated(self) -> bool:
Expand Down Expand Up @@ -296,7 +302,9 @@ def queue(self, use_version: bool = True) -> str | None:

def revoke(self, wait=False, timeout=None) -> None:
if self.async_result:
self.async_result.revoke(terminate=False, signal="SIGTERM", wait=wait, timeout=timeout)
return self.async_result.revoke(terminate=False, signal="SIGTERM", wait=wait, timeout=timeout)
else:
return None

def terminate(self, wait=False, timeout=None) -> str:
"""Revoke the task. Does not need Running workers"""
Expand Down

0 comments on commit 30c4dce

Please sign in to comment.