Skip to content

Commit

Permalink
Update durabletask protos, set custom status (#31)
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting authored Oct 28, 2024
1 parent a51257f commit 8598f6b
Show file tree
Hide file tree
Showing 10 changed files with 1,347 additions and 923 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,16 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## v0.2.0 (Unreleased)

### New

- Support for orchestration custom status ([#31](https://github.com/microsoft/durabletask-python/pull/31)) - contributed by [@famarting](https://github.com/famarting)

### Updates

- Updated `durabletask-protobuf` submodule reference to latest

## v0.1.1a1

### New
Expand Down
7 changes: 5 additions & 2 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def __init__(self, *,
def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Union[TInput, None] = None,
instance_id: Union[str, None] = None,
start_at: Union[datetime, None] = None) -> str:
start_at: Union[datetime, None] = None,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -113,7 +114,9 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
instanceId=instance_id if instance_id else uuid.uuid4().hex,
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""))
version=wrappers_pb2.StringValue(value=""),
orchestrationIdReusePolicy=reuse_id_policy,
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
Expand Down
385 changes: 203 additions & 182 deletions durabletask/internal/orchestrator_service_pb2.py

Large diffs are not rendered by default.

1,244 changes: 650 additions & 594 deletions durabletask/internal/orchestrator_service_pb2.pyi

Large diffs are not rendered by default.

418 changes: 330 additions & 88 deletions durabletask/internal/orchestrator_service_pb2_grpc.py

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ def is_replaying(self) -> bool:
"""
pass

@abstractmethod
def set_custom_status(self, custom_status: str) -> None:
"""Set the custom status.
"""
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
"""Create a Timer Task to fire after at the specified deadline.
Expand Down
22 changes: 17 additions & 5 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
TypeVar, Union)

import grpc
from google.protobuf import empty_pb2
from google.protobuf import empty_pb2, wrappers_pb2

import durabletask.internal.helpers as ph
import durabletask.internal.helpers as pbh
Expand Down Expand Up @@ -188,8 +188,8 @@ def stop(self):
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
try:
executor = _OrchestrationExecutor(self._registry, self._logger)
actions = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions)
result = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=result.actions, customStatus=wrappers_pb2.StringValue(value=result.custom_status))
except Exception as ex:
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
failure_details = pbh.new_failure_details(ex)
Expand Down Expand Up @@ -242,6 +242,7 @@ def __init__(self, instance_id: str):
self._pending_events: Dict[str, List[task.CompletableTask]] = {}
self._new_input: Optional[Any] = None
self._save_events = False
self._custom_status: str = ""

def run(self, generator: Generator[task.Task, Any, Any]):
self._generator = generator
Expand Down Expand Up @@ -355,6 +356,9 @@ def is_replaying(self) -> bool:
def current_utc_datetime(self, value: datetime):
self._current_utc_datetime = value

def set_custom_status(self, custom_status: str) -> None:
self._custom_status = custom_status

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
return self.create_timer_internal(fire_at)

Expand Down Expand Up @@ -457,6 +461,14 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
self.set_continued_as_new(new_input, save_events)


class ExecutionResults:
actions: List[pb.OrchestratorAction]
custom_status: str

def __init__(self, actions: List[pb.OrchestratorAction], custom_status: str):
self.actions = actions
self.custom_status = custom_status

class _OrchestrationExecutor:
_generator: Optional[task.Orchestrator] = None

Expand All @@ -466,7 +478,7 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
self._is_suspended = False
self._suspended_events: List[pb.HistoryEvent] = []

def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> List[pb.OrchestratorAction]:
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> ExecutionResults:
if not new_events:
raise task.OrchestrationStateError("The new history event list must have at least one event in it.")

Expand Down Expand Up @@ -501,7 +513,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
actions = ctx.get_actions()
if self._logger.level <= logging.DEBUG:
self._logger.debug(f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}")
return actions
return ExecutionResults(actions=actions, custom_status=ctx._custom_status)

def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
if self._is_suspended and _is_suspendable(event):
Expand Down
2 changes: 1 addition & 1 deletion submodules/durabletask-protobuf
23 changes: 23 additions & 0 deletions tests/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,26 @@ def throw_activity(ctx: task.ActivityContext, _):
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
assert state.failure_details.stack_trace is not None
assert throw_activity_counter == 4

def test_custom_status():

def empty_orchestrator(ctx: task.OrchestrationContext, _):
ctx.set_custom_status("foobaz")

# Start a worker, which will connect to the sidecar in a background thread
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(empty_orchestrator)
w.start()

c = client.TaskHubGrpcClient()
id = c.schedule_new_orchestration(empty_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)

assert state is not None
assert state.name == task.get_name(empty_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
assert state.serialized_input is None
assert state.serialized_output is None
assert state.serialized_custom_status is "\"foobaz\""
Loading

0 comments on commit 8598f6b

Please sign in to comment.