Skip to content

Commit

Permalink
Feature/celery results (#24)
Browse files Browse the repository at this point in the history
add ! django_celery_results
  • Loading branch information
vitali-yanushchyk-valor authored May 29, 2024
1 parent 82ed415 commit fdb25da
Show file tree
Hide file tree
Showing 19 changed files with 116 additions and 213 deletions.
1 change: 1 addition & 0 deletions compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ services:
azurite:
image: mcr.microsoft.com/azure-storage/azurite
command: "azurite -l /workspace -d /workspace/debug.log --blobPort 10000 --blobHost 0.0.0.0 --loose"
restart: always
ports:
- "10000:10000" # Blob service
volumes:
Expand Down
117 changes: 67 additions & 50 deletions pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@ dependencies = [
"face-recognition>=1.3.0",
"opencv-python>=4.9.0.80",
"psycopg2-binary>=2.9.9",
"sentry-sdk",
"sentry-sdk[celery,django]>=2.2.1",
"social-auth-app-django",
"social-auth-core",
"unicef-security",
"uwsgi>=2.0.25.1",
"drf-nested-routers>=0.94.1",
"face-recognition>=1.3.0",
"opencv-python>=4.9.0.80",
"django-celery-results>=2.5.1",
]

[tool.pdm.build]
Expand Down
5 changes: 5 additions & 0 deletions src/hope_dedup_engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from hope_dedup_engine.config.celery import app as celery_app


VERSION = __version__ = "0.1.0"

__all__ = ("celery_app",)
16 changes: 0 additions & 16 deletions src/hope_dedup_engine/apps/faces/admin.py

This file was deleted.

13 changes: 9 additions & 4 deletions src/hope_dedup_engine/apps/faces/celery_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from celery import shared_task
import traceback

from celery import shared_task, states

from hope_dedup_engine.apps.faces.utils.celery_utils import task_lifecycle
from hope_dedup_engine.apps.faces.utils.duplication_detector import DuplicationDetector
Expand All @@ -8,6 +10,9 @@
@task_lifecycle(name="Deduplicate", ttl=1 * 60 * 60)
# TODO: Use DeduplicationSet objects as input to deduplication pipeline
def deduplicate(self, filename: str):
# deduplicate.delay(filename=filename)
dd = DuplicationDetector(filename)
return dd.find_duplicates()
try:
dd = DuplicationDetector(filename)
return dd.find_duplicates()
except Exception as e:
self.update_state(state=states.FAILURE, meta={"exc_message": str(e), "traceback": traceback.format_exc()})
raise e
49 changes: 0 additions & 49 deletions src/hope_dedup_engine/apps/faces/migrations/0001_initial.py

This file was deleted.

Empty file.
1 change: 0 additions & 1 deletion src/hope_dedup_engine/apps/faces/models/__init__.py

This file was deleted.

24 changes: 0 additions & 24 deletions src/hope_dedup_engine/apps/faces/models/task_model.py

This file was deleted.

19 changes: 2 additions & 17 deletions src/hope_dedup_engine/apps/faces/utils/celery_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,9 @@
from functools import wraps

from django.conf import settings
from django.utils import timezone

import redis

from hope_dedup_engine.apps.faces.models import TaskModel

redis_client = redis.Redis.from_url(settings.CELERY_BROKER_URL)


Expand All @@ -17,33 +14,21 @@ def decorator(func):
def wrapper(self, *args, **kwargs):
logger = logging.getLogger(func.__module__)
logger.info(f"{name} task started")
result = None

filename: str = args[0] if args else kwargs.get("filename")
lock_name: str = f"{name}_{filename}"
if not _acquire_lock(lock_name, ttl):
logger.info(f"Task {name} with brocker lock {lock_name} is already running.")
return None

task: TaskModel = None
result = None

try:
task = TaskModel.objects.create(name=name, celery_task_id=self.request.id)
result = func(self, *args, **kwargs)
task.status = TaskModel.StatusChoices.COMPLETED_SUCCESS
task.completed_at = timezone.now()
task.is_success = True
except Exception as e:
logger.exception(f"{name} task failed", exc_info=e)
if task:
task.status = TaskModel.StatusChoices.FAILED
task.completed_at = timezone.now()
task.is_success = False
task.error = str(e)
raise e
finally:
_release_lock(lock_name)
if task:
task.save(update_fields=["status", "completed_at", "is_success", "error"])
logger.info(f"{name} task ended")
return result

Expand Down
Loading

0 comments on commit fdb25da

Please sign in to comment.