Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting committed Oct 28, 2024
1 parent 01b1eff commit 97bfd31
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 55 deletions.
16 changes: 12 additions & 4 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,8 @@ def stop(self):
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
try:
executor = _OrchestrationExecutor(self._registry, self._logger)
actions, custom_status = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions, customStatus=wrappers_pb2.StringValue(value=custom_status))
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=result.actions, customStatus=wrappers_pb2.StringValue(value=result.custom_status))
except Exception as ex:
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
failure_details = pbh.new_failure_details(ex)
Expand Down Expand Up @@ -461,6 +461,14 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
self.set_continued_as_new(new_input, save_events)


class ExecutionResults:
actions: List[pb.OrchestratorAction]
custom_status: str

def __init__(self, actions: List[pb.OrchestratorAction], custom_status: str):
self.actions = actions
self.custom_status = custom_status

class _OrchestrationExecutor:
_generator: Optional[task.Orchestrator] = None

Expand All @@ -470,7 +478,7 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
self._is_suspended = False
self._suspended_events: List[pb.HistoryEvent] = []

def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> Tuple[List[pb.OrchestratorAction],str]:
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> ExecutionResults:
if not new_events:
raise task.OrchestrationStateError("The new history event list must have at least one event in it.")

Expand Down Expand Up @@ -505,7 +513,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
actions = ctx.get_actions()
if self._logger.level <= logging.DEBUG:
self._logger.debug(f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}")
return actions, ctx._custom_status
return ExecutionResults(actions=actions, custom_status=ctx._custom_status)

def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
if self._is_suspended and _is_suspendable(event):
Expand Down
Loading

0 comments on commit 97bfd31

Please sign in to comment.