diff --git a/durabletask/worker.py b/durabletask/worker.py index 5e23c47..bcc1a30 100644 --- a/durabletask/worker.py +++ b/durabletask/worker.py @@ -188,8 +188,8 @@ def stop(self): def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub): try: executor = _OrchestrationExecutor(self._registry, self._logger) - actions, custom_status = executor.execute(req.instanceId, req.pastEvents, req.newEvents) - res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions, customStatus=wrappers_pb2.StringValue(value=custom_status)) + result = executor.execute(req.instanceId, req.pastEvents, req.newEvents) + res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=result.actions, customStatus=wrappers_pb2.StringValue(value=result.custom_status)) except Exception as ex: self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}") failure_details = pbh.new_failure_details(ex) @@ -461,6 +461,14 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None: self.set_continued_as_new(new_input, save_events) +class ExecutionResults: + actions: List[pb.OrchestratorAction] + custom_status: str + + def __init__(self, actions: List[pb.OrchestratorAction], custom_status: str): + self.actions = actions + self.custom_status = custom_status + class _OrchestrationExecutor: _generator: Optional[task.Orchestrator] = None @@ -470,7 +478,7 @@ def __init__(self, registry: _Registry, logger: logging.Logger): self._is_suspended = False self._suspended_events: List[pb.HistoryEvent] = [] - def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> Tuple[List[pb.OrchestratorAction],str]: + def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> ExecutionResults: if not new_events: raise task.OrchestrationStateError("The new history event list must have at least one event in it.") @@ -505,7 +513,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e actions = ctx.get_actions() if self._logger.level <= logging.DEBUG: self._logger.debug(f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}") - return actions, ctx._custom_status + return ExecutionResults(actions=actions, custom_status=ctx._custom_status) def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None: if self._is_suspended and _is_suspendable(event): diff --git a/tests/test_orchestration_executor.py b/tests/test_orchestration_executor.py index 6d5fdac..95eab0b 100644 --- a/tests/test_orchestration_executor.py +++ b/tests/test_orchestration_executor.py @@ -38,7 +38,8 @@ def orchestrator(ctx: task.OrchestrationContext, my_input: int): helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=json.dumps(test_input)), ] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, [], new_events) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED @@ -59,7 +60,8 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _): new_events = [helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, [], new_events) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED @@ -73,7 +75,8 @@ def test_orchestrator_not_registered(): name = "Bogus" new_events = [helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, [], new_events) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -99,7 +102,8 @@ def delay_orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(start_time), helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input=None)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, [], new_events) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions assert actions is not None assert len(actions) == 1 @@ -131,7 +135,8 @@ def delay_orchestrator(ctx: task.OrchestrationContext, _): helpers.new_timer_fired_event(1, expected_fire_at)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED @@ -156,7 +161,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(), helpers.new_execution_started_event(name, TEST_INSTANCE_ID, encoded_input)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, [], new_events) + result = executor.execute(TEST_INSTANCE_ID, [], new_events) + actions = result.actions assert len(actions) == 1 assert type(actions[0]) is pb.OrchestratorAction @@ -188,7 +194,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): new_events = [helpers.new_task_completed_event(1, encoded_output)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED @@ -216,7 +223,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): new_events = [helpers.new_task_failed_event(1, ex)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -261,7 +269,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(timestamp=current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 1 assert actions[0].HasField("createTimer") assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -274,7 +283,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_timer_fired_event(2, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 2 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -286,7 +296,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 3 assert actions[2].HasField("createTimer") assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -299,7 +310,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_timer_fired_event(3, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 3 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -311,7 +323,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 4 assert actions[3].HasField("createTimer") assert actions[3].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -324,7 +337,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_timer_fired_event(4, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 4 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -336,7 +350,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 5 assert actions[4].HasField("createTimer") assert actions[4].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -349,7 +364,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_timer_fired_event(5, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 5 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -362,7 +378,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 6 assert actions[5].HasField("createTimer") assert actions[5].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -375,7 +392,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_timer_fired_event(6, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 6 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -386,7 +404,8 @@ def orchestrator(ctx: task.OrchestrationContext, orchestrator_input): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 1 assert actions[0].completeOrchestration.failureDetails.errorMessage.__contains__("Activity task #1 failed: Kah-BOOOOM!!!") assert actions[0].id == 7 @@ -412,7 +431,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): new_events = [helpers.new_timer_fired_event(timer_id=1, fire_at=fire_at)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -439,7 +459,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): new_events = [helpers.new_task_completed_event(1)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -468,7 +489,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): new_events = [helpers.new_task_completed_event(1)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -498,7 +520,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): new_events = [helpers.new_task_completed_event(1)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -531,7 +554,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_sub_orchestration_completed_event(1, encoded_output="42")] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED @@ -560,7 +584,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): new_events = [helpers.new_sub_orchestration_failed_event(1, ex)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -590,7 +615,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_sub_orchestration_completed_event(1, encoded_output="42")] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -619,7 +645,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_sub_orchestration_completed_event(1, encoded_output="42")] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -645,7 +672,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): # Execute the orchestration until it is waiting for an external event. The result # should be an empty list of actions because the orchestration didn't schedule any work. executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 0 # Now send an external event to the orchestration and execute it again. This time @@ -653,7 +681,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = new_events new_events = [helpers.new_event_raised_event("my_event", encoded_input="42")] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert complete_action.result.value == "42" @@ -677,7 +706,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): # Execute the orchestration. It should be in a running state waiting for the timer to fire executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 1 assert actions[0].HasField("createTimer") @@ -687,7 +717,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = new_events + [helpers.new_timer_created_event(1, timer_due_time)] new_events = [helpers.new_timer_fired_event(1, timer_due_time)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert complete_action.result.value == "42" @@ -713,14 +744,16 @@ def orchestrator(ctx: task.OrchestrationContext, _): # Execute the orchestration. It should remain in a running state because it was suspended prior # to processing the event raised event. executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 0 # Resume the orchestration. It should complete successfully. old_events = old_events + new_events new_events = [helpers.new_resume_event()] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert complete_action.result.value == "42" @@ -745,7 +778,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): # Execute the orchestration. It should be in a running state waiting for an external event executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_TERMINATED assert complete_action.result.value == json.dumps("terminated!") @@ -773,7 +807,8 @@ def orchestrator(ctx: task.OrchestrationContext, input: int): # Execute the orchestration. It should be in a running state waiting for the timer to fire executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_CONTINUED_AS_NEW assert complete_action.result.value == json.dumps(2) @@ -808,7 +843,8 @@ def orchestrator(ctx: task.OrchestrationContext, count: int): helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input="10")] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions # The result should be 10 "taskScheduled" actions with inputs from 0 to 9 assert len(actions) == 10 @@ -849,12 +885,14 @@ def orchestrator(ctx: task.OrchestrationContext, _): # First, test with only the first 5 events. We expect the orchestration to be running # but return zero actions since its still waiting for the other 5 tasks to complete. executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events[:5]) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events[:5]) + actions = result.actions assert len(actions) == 0 # Now test with the full set of new events. We expect the orchestration to complete. executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED @@ -895,7 +933,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): # Now test with the full set of new events. We expect the orchestration to complete. executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED @@ -926,7 +965,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): old_events = [] new_events = [helpers.new_execution_started_event(orchestrator_name, TEST_INSTANCE_ID, encoded_input=None)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 2 assert actions[0].HasField('scheduleTask') assert actions[1].HasField('scheduleTask') @@ -942,7 +982,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): encoded_output = json.dumps(hello(None, "Tokyo")) new_events = [helpers.new_task_completed_event(1, encoded_output)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert complete_action.result.value == encoded_output @@ -951,7 +992,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): encoded_output = json.dumps(hello(None, "Seattle")) new_events = [helpers.new_task_completed_event(2, encoded_output)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert complete_action.result.value == encoded_output @@ -997,7 +1039,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(timestamp=current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 1 assert actions[0].HasField("createTimer") assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -1010,7 +1053,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_timer_fired_event(3, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 2 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -1022,7 +1066,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 3 assert actions[2].HasField("createTimer") assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -1032,7 +1077,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): encoded_output = json.dumps(dummy_activity(None, "Seattle")) new_events = [helpers.new_task_completed_event(2, encoded_output)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_COMPLETED assert complete_action.result.value == encoded_output @@ -1075,7 +1121,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(timestamp=current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 1 assert actions[0].HasField("createTimer") assert actions[0].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -1088,7 +1135,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_timer_fired_event(3, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 2 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -1100,7 +1148,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 3 assert actions[2].HasField("createTimer") assert actions[2].createTimer.fireAt.ToDatetime() == expected_fire_at @@ -1113,7 +1162,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): new_events = [helpers.new_task_completed_event(2, encoded_output), helpers.new_timer_fired_event(4, current_timestamp)] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions assert len(actions) == 3 assert actions[1].HasField("scheduleTask") assert actions[1].id == 1 @@ -1126,7 +1176,8 @@ def orchestrator(ctx: task.OrchestrationContext, _): helpers.new_orchestrator_started_event(current_timestamp), helpers.new_task_failed_event(1, ValueError("Kah-BOOOOM!!!"))] executor = worker._OrchestrationExecutor(registry, TEST_LOGGER) - actions = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + result = executor.execute(TEST_INSTANCE_ID, old_events, new_events) + actions = result.actions complete_action = get_and_validate_single_complete_orchestration_action(actions) assert complete_action.orchestrationStatus == pb.ORCHESTRATION_STATUS_FAILED assert complete_action.failureDetails.errorType == 'TaskFailedError' # TODO: Should this be the specific error?