Skip to content

Commit

Permalink
Adding recursive option for terminate (#27)
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <[email protected]>
  • Loading branch information
shivamkm07 authored Jan 17, 2024
1 parent cbd07a8 commit 460f5ef
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
6 changes: 4 additions & 2 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,12 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
self._stub.RaiseEvent(req)

def terminate_orchestration(self, instance_id: str, *,
output: Union[Any, None] = None):
output: Union[Any, None] = None,
recursive: bool = True):
req = pb.TerminateRequest(
instanceId=instance_id,
output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None)
output=wrappers_pb2.StringValue(value=shared.to_json(output)) if output else None,
recursive=recursive)

self._logger.info(f"Terminating instance '{instance_id}'.")
self._stub.TerminateInstance(req)
Expand Down
32 changes: 32 additions & 0 deletions tests/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,38 @@ def orchestrator(ctx: task.OrchestrationContext, _):
assert state.runtime_status == client.OrchestrationStatus.TERMINATED
assert state.serialized_output == json.dumps("some reason for termination")

def test_terminate_recursive():
def root(ctx: task.OrchestrationContext, _):
result = yield ctx.call_sub_orchestrator(child)
return result
def child(ctx: task.OrchestrationContext, _):
result = yield ctx.wait_for_external_event("my_event")
return result

# Start a worker, which will connect to the sidecar in a background thread
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(root)
w.add_orchestrator(child)
w.start()

task_hub_client = client.TaskHubGrpcClient()
id = task_hub_client.schedule_new_orchestration(root)
state = task_hub_client.wait_for_orchestration_start(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.RUNNING

# Terminate root orchestration(recursive set to True by default)
task_hub_client.terminate_orchestration(id, output="some reason for termination")
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.TERMINATED

# Verify that child orchestration is also terminated
c = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
assert state is not None
assert state.runtime_status == client.OrchestrationStatus.TERMINATED



def test_continue_as_new():
all_results = []
Expand Down

0 comments on commit 460f5ef

Please sign in to comment.