Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [ch37815] etl task cancel feature #365

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion src/etools_datamart/apps/etl/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from etools_datamart.sentry import process_exception

from . import models
from .loader import RUN_QUEUED, RUN_UNKNOWN
from .loader import RUN_QUEUED, RUN_UNKNOWN, RUN_CANCELED

cache = caches["default"]

Expand All @@ -42,6 +42,13 @@ def queue(modeladmin, request, queryset):
modeladmin.message_user(request, "{0} task{1} queued".format(count, pluralize(count)), messages.SUCCESS)


def cancel(modeladmin, request, queryset):
count = len(queryset)
for obj in queryset:
modeladmin.cancel(modeladmin, request, pk=obj.pk)
modeladmin.message_user(request, "{0} task{1} canceled".format(count, pluralize(count)), messages.SUCCESS)


def unlock(modeladmin, request, queryset):
count = len(queryset)

Expand Down Expand Up @@ -138,6 +145,7 @@ class EtlTaskAdmin(ExtraButtonsMixin, admin.ModelAdmin):
"_last_failure",
"unlock_task",
"queue_task",
"cancel_task",
"data",
)
date_hierarchy = "last_run"
Expand Down Expand Up @@ -230,6 +238,16 @@ def queue_task(self, obj):

queue_task.verbose_name = "queue"

def cancel_task(self, obj):
opts = self.model._meta
# TODO: if tasks is still in the queue or running enable the link
# Otherwise no active link
url = reverse("admin:%s_%s_cancel" % (opts.app_label, opts.model_name), args=[obj.id])
# return format_html(f'<a href="{url}">cancel</a>')
return format_html("")

cancel_task.verbose_name = "cancel"

def unlock_task(self, obj):
if obj.content_type:
locked = obj.content_type.model_class().loader.is_locked
Expand Down Expand Up @@ -319,10 +337,35 @@ def _execute(self, request, pk, sync):
self.message_user(request, f"Cannot queue '{obj.task}': {e}", messages.ERROR)
return HttpResponseRedirect(reverse("admin:etl_etltask_changelist"))

def _cancel(self, request, pk, sync):
obj = self.get_object(request, pk)
try:
# TODO: Check the status just before if it is still cancelable

obj.status = "CANCELED"
obj.elapsed = None
obj.save()
task = app.tasks.get(obj.task)
# TODO: Alternative way to

if sync:
task(run_type=RUN_CANCELED)
else:
task.delay(run_type=RUN_CANCELED)
self.message_user(request, f"Task '{obj.task}' canceled", messages.SUCCESS)
except Exception as e: # pragma: no cover
process_exception(e)
self.message_user(request, f"Cannot queue '{obj.task}': {e}", messages.ERROR)
return HttpResponseRedirect(reverse("admin:etl_etltask_changelist"))

@button()
def queue(self, request, pk):
return self._execute(request, pk, sync=False)

@button()
def cancel(self, request, pk):
return self._cancel(request, pk, sync=False)

@button()
def run(self, request, pk):
return self._execute(request, pk, sync=True)
Expand Down
6 changes: 6 additions & 0 deletions src/etools_datamart/apps/etl/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@
RUN_SCHEDULE = 3
RUN_QUEUED = 4
RUN_AS_REQUIREMENT = 5
RUN_CANCELED = 6

RUN_TYPES = (
(RUN_UNKNOWN, ""),
(RUN_MANUAL, "Manual"),
(RUN_COMMAND, "cli"),
(RUN_SCHEDULE, "Celery"),
(RUN_QUEUED, "Forced queue"),
(RUN_AS_REQUIREMENT, "Required by task"),
(RUN_CANCELED, "Canceled"),
)


Expand Down Expand Up @@ -323,6 +326,8 @@ def is_record_changed(self, record, values):
return False

def process_record(self, filters, values):
# TODO: Check task cancel status

stdout = self.context["stdout"]
verbosity = self.context["verbosity"]
if stdout and verbosity > 2: # pragma: no cover
Expand Down Expand Up @@ -426,6 +431,7 @@ def unlock(self):
locks.delete(self.config.lock_key)
lock.release()
except LockError:
# TODO: Unlock failres should be reported
pass

@cached_property
Expand Down
1 change: 1 addition & 0 deletions src/etools_datamart/apps/mart/data/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def process_country(self):

paginator = DatamartPaginator(qs, batch_size)
for page_idx in paginator.page_range:
# TODO: Check cancel status of the task
page = paginator.page(page_idx)
for record in page.object_list:
filters = self.config.key(self, record)
Expand Down