diff --git a/src/etools_datamart/apps/etl/admin.py b/src/etools_datamart/apps/etl/admin.py index 9bcf9a03..5ab3d99c 100644 --- a/src/etools_datamart/apps/etl/admin.py +++ b/src/etools_datamart/apps/etl/admin.py @@ -30,7 +30,7 @@ from etools_datamart.sentry import process_exception from . import models -from .loader import RUN_QUEUED, RUN_UNKNOWN +from .loader import RUN_QUEUED, RUN_UNKNOWN, RUN_CANCELED cache = caches["default"] @@ -42,6 +42,13 @@ def queue(modeladmin, request, queryset): modeladmin.message_user(request, "{0} task{1} queued".format(count, pluralize(count)), messages.SUCCESS) +def cancel(modeladmin, request, queryset): + count = len(queryset) + for obj in queryset: + modeladmin.cancel(modeladmin, request, pk=obj.pk) + modeladmin.message_user(request, "{0} task{1} canceled".format(count, pluralize(count)), messages.SUCCESS) + + def unlock(modeladmin, request, queryset): count = len(queryset) @@ -138,6 +145,7 @@ class EtlTaskAdmin(ExtraButtonsMixin, admin.ModelAdmin): "_last_failure", "unlock_task", "queue_task", + "cancel_task", "data", ) date_hierarchy = "last_run" @@ -230,6 +238,16 @@ def queue_task(self, obj): queue_task.verbose_name = "queue" + def cancel_task(self, obj): + opts = self.model._meta + # TODO: if tasks is still in the queue or running enable the link + # Otherwise no active link + url = reverse("admin:%s_%s_cancel" % (opts.app_label, opts.model_name), args=[obj.id]) + # return format_html(f'cancel') + return format_html("") + + cancel_task.verbose_name = "cancel" + def unlock_task(self, obj): if obj.content_type: locked = obj.content_type.model_class().loader.is_locked @@ -319,10 +337,35 @@ def _execute(self, request, pk, sync): self.message_user(request, f"Cannot queue '{obj.task}': {e}", messages.ERROR) return HttpResponseRedirect(reverse("admin:etl_etltask_changelist")) + def _cancel(self, request, pk, sync): + obj = self.get_object(request, pk) + try: + # TODO: Check the status just before if it is still cancelable + + obj.status = "CANCELED" + obj.elapsed = None + obj.save() + task = app.tasks.get(obj.task) + # TODO: Alternative way to + + if sync: + task(run_type=RUN_CANCELED) + else: + task.delay(run_type=RUN_CANCELED) + self.message_user(request, f"Task '{obj.task}' canceled", messages.SUCCESS) + except Exception as e: # pragma: no cover + process_exception(e) + self.message_user(request, f"Cannot queue '{obj.task}': {e}", messages.ERROR) + return HttpResponseRedirect(reverse("admin:etl_etltask_changelist")) + @button() def queue(self, request, pk): return self._execute(request, pk, sync=False) + @button() + def cancel(self, request, pk): + return self._cancel(request, pk, sync=False) + @button() def run(self, request, pk): return self._execute(request, pk, sync=True) diff --git a/src/etools_datamart/apps/etl/loader.py b/src/etools_datamart/apps/etl/loader.py index f6d66fba..00a2ed7d 100644 --- a/src/etools_datamart/apps/etl/loader.py +++ b/src/etools_datamart/apps/etl/loader.py @@ -40,6 +40,8 @@ RUN_SCHEDULE = 3 RUN_QUEUED = 4 RUN_AS_REQUIREMENT = 5 +RUN_CANCELED = 6 + RUN_TYPES = ( (RUN_UNKNOWN, ""), (RUN_MANUAL, "Manual"), @@ -47,6 +49,7 @@ (RUN_SCHEDULE, "Celery"), (RUN_QUEUED, "Forced queue"), (RUN_AS_REQUIREMENT, "Required by task"), + (RUN_CANCELED, "Canceled"), ) @@ -323,6 +326,8 @@ def is_record_changed(self, record, values): return False def process_record(self, filters, values): + # TODO: Check task cancel status + stdout = self.context["stdout"] verbosity = self.context["verbosity"] if stdout and verbosity > 2: # pragma: no cover @@ -426,6 +431,7 @@ def unlock(self): locks.delete(self.config.lock_key) lock.release() except LockError: + # TODO: Unlock failres should be reported pass @cached_property diff --git a/src/etools_datamart/apps/mart/data/loader.py b/src/etools_datamart/apps/mart/data/loader.py index 2a436400..d5fa14b7 100644 --- a/src/etools_datamart/apps/mart/data/loader.py +++ b/src/etools_datamart/apps/mart/data/loader.py @@ -147,6 +147,7 @@ def process_country(self): paginator = DatamartPaginator(qs, batch_size) for page_idx in paginator.page_range: + # TODO: Check cancel status of the task page = paginator.page(page_idx) for record in page.object_list: filters = self.config.key(self, record)