From e61d726d7ffc5905018f5efefc2aec8707f317ce Mon Sep 17 00:00:00 2001 From: Vikram Patki Date: Tue, 24 Oct 2023 11:20:10 -0400 Subject: [PATCH 1/2] Allowing re-execution of tasks based on a reprocessing the message --- dagger/tasks/task.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/dagger/tasks/task.py b/dagger/tasks/task.py index 30e4610..d4b0b08 100644 --- a/dagger/tasks/task.py +++ b/dagger/tasks/task.py @@ -325,12 +325,12 @@ async def start( if self.status.code in [ TaskStatusEnum.COMPLETED.name, TaskStatusEnum.SKIPPED.name, - ]: + ] and not self.reprocess_on_message: return await self.on_complete( status=self.status, workflow_instance=workflow_instance ) if ( - ignore_status or self.status.code == TaskStatusEnum.NOT_STARTED.name + ignore_status or self.status.code == TaskStatusEnum.NOT_STARTED.name or self.reprocess_on_message ) and workflow_instance: self.status = TaskStatus( code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value @@ -967,13 +967,14 @@ async def start(self, workflow_instance: Optional[ITemplateDAGInstance]) -> None if self.status.code in [ TaskStatusEnum.COMPLETED.name, TaskStatusEnum.SKIPPED.name, - ]: + ] and not self.reprocess_on_message: return await self.on_complete( status=self.status, workflow_instance=workflow_instance ) if ( self.status.code == TaskStatusEnum.NOT_STARTED.name or self.status.code == TaskStatusEnum.SUBMITTED.name + or self.reprocess_on_message ) and workflow_instance: await self.execute( runtime_parameters=workflow_instance.runtime_parameters, From 2f23f76d79f38b3d270440994707ad22d7a5db03 Mon Sep 17 00:00:00 2001 From: Vikram Patki Date: Wed, 25 Oct 2023 10:23:34 -0400 Subject: [PATCH 2/2] Allowing re-execution of tasks based on a reprocessing the message --- dagger/tasks/task.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/dagger/tasks/task.py b/dagger/tasks/task.py index d4b0b08..ce48790 100644 --- a/dagger/tasks/task.py +++ b/dagger/tasks/task.py @@ -322,15 +322,21 @@ async def start( ignore_status: bool = False, ) -> None: # pre-execute - if self.status.code in [ - TaskStatusEnum.COMPLETED.name, - TaskStatusEnum.SKIPPED.name, - ] and not self.reprocess_on_message: + if ( + self.status.code + in [ + TaskStatusEnum.COMPLETED.name, + TaskStatusEnum.SKIPPED.name, + ] + and not self.reprocess_on_message + ): return await self.on_complete( status=self.status, workflow_instance=workflow_instance ) if ( - ignore_status or self.status.code == TaskStatusEnum.NOT_STARTED.name or self.reprocess_on_message + ignore_status + or self.status.code == TaskStatusEnum.NOT_STARTED.name + or self.reprocess_on_message ) and workflow_instance: self.status = TaskStatus( code=TaskStatusEnum.EXECUTING.name, value=TaskStatusEnum.EXECUTING.value @@ -964,10 +970,14 @@ async def stop( ) async def start(self, workflow_instance: Optional[ITemplateDAGInstance]) -> None: - if self.status.code in [ - TaskStatusEnum.COMPLETED.name, - TaskStatusEnum.SKIPPED.name, - ] and not self.reprocess_on_message: + if ( + self.status.code + in [ + TaskStatusEnum.COMPLETED.name, + TaskStatusEnum.SKIPPED.name, + ] + and not self.reprocess_on_message + ): return await self.on_complete( status=self.status, workflow_instance=workflow_instance )