diff --git a/dev-requirements.txt b/dev-requirements.txt index 15866725..769b1e1e 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -15,4 +15,4 @@ Flask>=1.1 # needed for auto fix ruff===0.2.2 # needed for dapr-ext-workflow -durabletask>=0.1.1a1 +durabletask-dapr >= 0.2.0a4 diff --git a/examples/workflow/monitor.py b/examples/workflow/monitor.py index a6da1c7d..6bdb6cc3 100644 --- a/examples/workflow/monitor.py +++ b/examples/workflow/monitor.py @@ -69,6 +69,7 @@ def send_alert(ctx, message: str): except Exception: pass if not status or status.runtime_status.name != 'RUNNING': + # TODO update to use reuse_id_policy instance_id = wf_client.schedule_new_workflow( workflow=status_monitor_workflow, input=JobStatus(job_id=job_id, is_healthy=True), diff --git a/examples/workflow/task_chaining.py b/examples/workflow/task_chaining.py index c24e340c..c67308d5 100644 --- a/examples/workflow/task_chaining.py +++ b/examples/workflow/task_chaining.py @@ -27,6 +27,7 @@ def task_chain_workflow(ctx: wf.DaprWorkflowContext, wf_input: int): except Exception as e: yield ctx.call_activity(error_handler, input=str(e)) raise + # TODO update to set custom status return [result1, result2, result3] diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py index 19f49981..b9865344 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_client.py @@ -17,7 +17,9 @@ from datetime import datetime from typing import Any, Optional, TypeVar + from durabletask import client +import durabletask.internal.orchestrator_service_pb2 as pb from dapr.ext.workflow.workflow_state import WorkflowState from dapr.ext.workflow.workflow_context import Workflow @@ -78,6 +80,7 @@ def schedule_new_workflow( input: Optional[TInput] = None, instance_id: Optional[str] = None, start_at: Optional[datetime] = None, + reuse_id_policy: Optional[pb.OrchestrationIdReusePolicy] = None, ) -> str: """Schedules a new workflow instance for execution. @@ -90,6 +93,8 @@ def schedule_new_workflow( start_at: The time when the workflow instance should start executing. If not specified or if a date-time in the past is specified, the workflow instance will be scheduled immediately. + reuse_id_policy: Optional policy to reuse the workflow id when there is a conflict with + an existing workflow instance. Returns: The ID of the scheduled workflow instance. @@ -100,9 +105,14 @@ def schedule_new_workflow( input=input, instance_id=instance_id, start_at=start_at, + reuse_id_policy=reuse_id_policy, ) return self.__obj.schedule_new_orchestration( - workflow.__name__, input=input, instance_id=instance_id, start_at=start_at + workflow.__name__, + input=input, + instance_id=instance_id, + start_at=start_at, + reuse_id_policy=reuse_id_policy, ) def get_workflow_state( @@ -208,7 +218,9 @@ def raise_workflow_event( """ return self.__obj.raise_orchestration_event(instance_id, event_name, data=data) - def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None): + def terminate_workflow( + self, instance_id: str, *, output: Optional[Any] = None, recursive: bool = True + ): """Terminates a running workflow instance and updates its runtime status to WorkflowRuntimeStatus.Terminated This method internally enqueues a "terminate" message in the task hub. When the task hub worker processes this message, it will update the runtime @@ -226,9 +238,10 @@ def terminate_workflow(self, instance_id: str, *, output: Optional[Any] = None): Args: instance_id: The ID of the workflow instance to terminate. output: The optional output to set for the terminated workflow instance. + recursive: The optional flag to terminate all child workflows. """ - return self.__obj.terminate_orchestration(instance_id, output=output) + return self.__obj.terminate_orchestration(instance_id, output=output, recursive=recursive) def pause_workflow(self, instance_id: str): """Suspends a workflow instance, halting processing of it until resume_workflow is used to @@ -246,3 +259,12 @@ def resume_workflow(self, instance_id: str): instance_id: The instance ID of the workflow to resume. """ return self.__obj.resume_orchestration(instance_id) + + def purge_workflow(self, instance_id: str, recursive: bool = True): + """Purge data from a workflow instance. + + Args: + instance_id: The instance ID of the workflow to purge. + recursive: The optional flag to also purge data from all child workflows. + """ + return self.__obj.purge_orchestration(instance_id, recursive) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py index dbcccd64..2dee46fe 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py @@ -53,6 +53,10 @@ def current_utc_datetime(self) -> datetime: def is_replaying(self) -> bool: return self.__obj.is_replaying + def set_custom_status(self, custom_status: str) -> None: + self._logger.debug(f'{self.instance_id}: Setting custom status to {custom_status}') + self.__obj.set_custom_status(custom_status) + def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: self._logger.debug(f'{self.instance_id}: Creating timer to fire at {fire_at} time') return self.__obj.create_timer(fire_at) diff --git a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py index e0e3c736..b4c85f6a 100644 --- a/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py +++ b/ext/dapr-ext-workflow/dapr/ext/workflow/workflow_context.py @@ -84,6 +84,11 @@ def is_replaying(self) -> bool: """ pass + @abstractmethod + def set_custom_status(self, custom_status: str) -> None: + """Set the custom status.""" + pass + @abstractmethod def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task: """Create a Timer Task to fire after at the specified deadline. diff --git a/ext/dapr-ext-workflow/setup.cfg b/ext/dapr-ext-workflow/setup.cfg index 0cd3949a..e61b94ec 100644 --- a/ext/dapr-ext-workflow/setup.cfg +++ b/ext/dapr-ext-workflow/setup.cfg @@ -25,7 +25,7 @@ packages = find_namespace: include_package_data = True install_requires = dapr-dev >= 1.13.0rc1.dev - durabletask >= 0.1.1a1 + durabletask-dapr >= 0.2.0a4 [options.packages.find] include = diff --git a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py index 6b3c9ad3..9fdfe044 100644 --- a/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py +++ b/ext/dapr-ext-workflow/tests/test_dapr_workflow_context.py @@ -25,11 +25,13 @@ mock_create_timer = 'create_timer' mock_call_activity = 'call_activity' mock_call_sub_orchestrator = 'call_sub_orchestrator' +mock_custom_status = 'custom_status' class FakeOrchestrationContext: def __init__(self): self.instance_id = mock_instance_id + self.custom_status = None def create_timer(self, fire_at): return mock_create_timer @@ -40,6 +42,9 @@ def call_activity(self, activity, input): def call_sub_orchestrator(self, orchestrator, input, instance_id): return mock_call_sub_orchestrator + def set_custom_status(self, custom_status): + self.custom_status = custom_status + class DaprWorkflowContextTest(unittest.TestCase): def mock_client_activity(ctx: WorkflowActivityContext, input): @@ -65,3 +70,6 @@ def test_workflow_context_functions(self): create_timer_result = dapr_wf_ctx.create_timer(mock_date_time) assert create_timer_result == mock_create_timer + + dapr_wf_ctx.set_custom_status(mock_custom_status) + assert fakeContext.custom_status == mock_custom_status diff --git a/ext/dapr-ext-workflow/tests/test_workflow_client.py b/ext/dapr-ext-workflow/tests/test_workflow_client.py index 4a7f93b9..e1c9b772 100644 --- a/ext/dapr-ext-workflow/tests/test_workflow_client.py +++ b/ext/dapr-ext-workflow/tests/test_workflow_client.py @@ -20,17 +20,26 @@ from unittest import mock from dapr.ext.workflow.dapr_workflow_client import DaprWorkflowClient from durabletask import client +import durabletask.internal.orchestrator_service_pb2 as pb mock_schedule_result = 'workflow001' mock_raise_event_result = 'event001' mock_terminate_result = 'terminate001' mock_suspend_result = 'suspend001' mock_resume_result = 'resume001' -mockInstanceId = 'instance001' +mock_purge_result = 'purge001' +mock_instance_id = 'instance001' class FakeTaskHubGrpcClient: - def schedule_new_orchestration(self, workflow, input, instance_id, start_at): + def schedule_new_orchestration( + self, + workflow, + input, + instance_id, + start_at, + reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None, + ): return mock_schedule_result def get_orchestration_state(self, instance_id, fetch_payloads): @@ -49,7 +58,9 @@ def raise_orchestration_event( ): return mock_raise_event_result - def terminate_orchestration(self, instance_id: str, *, output: Union[Any, None] = None): + def terminate_orchestration( + self, instance_id: str, *, output: Union[Any, None] = None, recursive: bool = True + ): return mock_terminate_result def suspend_orchestration(self, instance_id: str): @@ -58,6 +69,9 @@ def suspend_orchestration(self, instance_id: str): def resume_orchestration(self, instance_id: str): return mock_resume_result + def purge_orchestration(self, instance_id: str, recursive: bool = True): + return mock_purge_result + def _inner_get_orchestration_state(self, instance_id, state: client.OrchestrationStatus): return client.OrchestrationState( instance_id=instance_id, @@ -87,35 +101,38 @@ def test_client_functions(self): assert actual_schedule_result == mock_schedule_result actual_get_result = wfClient.get_workflow_state( - instance_id=mockInstanceId, fetch_payloads=True + instance_id=mock_instance_id, fetch_payloads=True ) assert actual_get_result.runtime_status.name == 'PENDING' - assert actual_get_result.instance_id == mockInstanceId + assert actual_get_result.instance_id == mock_instance_id actual_wait_start_result = wfClient.wait_for_workflow_start( - instance_id=mockInstanceId, timeout_in_seconds=30 + instance_id=mock_instance_id, timeout_in_seconds=30 ) assert actual_wait_start_result.runtime_status.name == 'RUNNING' - assert actual_wait_start_result.instance_id == mockInstanceId + assert actual_wait_start_result.instance_id == mock_instance_id actual_wait_completion_result = wfClient.wait_for_workflow_completion( - instance_id=mockInstanceId, timeout_in_seconds=30 + instance_id=mock_instance_id, timeout_in_seconds=30 ) assert actual_wait_completion_result.runtime_status.name == 'COMPLETED' - assert actual_wait_completion_result.instance_id == mockInstanceId + assert actual_wait_completion_result.instance_id == mock_instance_id actual_raise_event_result = wfClient.raise_workflow_event( - instance_id=mockInstanceId, event_name='test_event', data='test_data' + instance_id=mock_instance_id, event_name='test_event', data='test_data' ) assert actual_raise_event_result == mock_raise_event_result actual_terminate_result = wfClient.terminate_workflow( - instance_id=mockInstanceId, output='test_output' + instance_id=mock_instance_id, output='test_output' ) assert actual_terminate_result == mock_terminate_result - actual_suspend_result = wfClient.pause_workflow(instance_id=mockInstanceId) + actual_suspend_result = wfClient.pause_workflow(instance_id=mock_instance_id) assert actual_suspend_result == mock_suspend_result - actual_resume_result = wfClient.resume_workflow(instance_id=mockInstanceId) + actual_resume_result = wfClient.resume_workflow(instance_id=mock_instance_id) assert actual_resume_result == mock_resume_result + + actual_purge_result = wfClient.purge_workflow(instance_id=mock_instance_id) + assert actual_purge_result == mock_purge_result diff --git a/setup.cfg b/setup.cfg index dcb6ba8d..de5d53f4 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] url = https://dapr.io/ author = Dapr Authors -author_email = daprweb@microsoft.com +author_email = dapr@dapr.io license = Apache license_file = LICENSE classifiers =