Skip to content

Commit

Permalink
update durabletask protos, set custom status
Browse files Browse the repository at this point in the history
Signed-off-by: Fabian Martinez <[email protected]>
  • Loading branch information
famarting committed Oct 24, 2024
1 parent a51257f commit d7e30eb
Show file tree
Hide file tree
Showing 8 changed files with 1,240 additions and 874 deletions.
7 changes: 5 additions & 2 deletions durabletask/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def __init__(self, *,
def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInput, TOutput], str], *,
input: Union[TInput, None] = None,
instance_id: Union[str, None] = None,
start_at: Union[datetime, None] = None) -> str:
start_at: Union[datetime, None] = None,
reuse_id_policy: Union[pb.OrchestrationIdReusePolicy, None] = None) -> str:

name = orchestrator if isinstance(orchestrator, str) else task.get_name(orchestrator)

Expand All @@ -113,7 +114,9 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
instanceId=instance_id if instance_id else uuid.uuid4().hex,
input=wrappers_pb2.StringValue(value=shared.to_json(input)) if input is not None else None,
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
version=wrappers_pb2.StringValue(value=""))
version=wrappers_pb2.StringValue(value=""),
orchestrationIdReusePolicy=reuse_id_policy,
)

self._logger.info(f"Starting new '{name}' instance with ID = '{req.instanceId}'.")
res: pb.CreateInstanceResponse = self._stub.StartInstance(req)
Expand Down
385 changes: 203 additions & 182 deletions durabletask/internal/orchestrator_service_pb2.py

Large diffs are not rendered by default.

1,244 changes: 650 additions & 594 deletions durabletask/internal/orchestrator_service_pb2.pyi

Large diffs are not rendered by default.

420 changes: 330 additions & 90 deletions durabletask/internal/orchestrator_service_pb2_grpc.py

Large diffs are not rendered by default.

14 changes: 14 additions & 0 deletions durabletask/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,20 @@ def is_replaying(self) -> bool:
"""
pass

@property
@abstractmethod
def custom_status(self) -> str:
"""Get the custom status.
"""
pass

@custom_status.setter
@abstractmethod
def custom_status(self, custom_status: str) -> None:
"""Set the custom status.
"""
pass

@abstractmethod
def create_timer(self, fire_at: Union[datetime, timedelta]) -> Task:
"""Create a Timer Task to fire after at the specified deadline.
Expand Down
19 changes: 14 additions & 5 deletions durabletask/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
TypeVar, Union)

import grpc
from google.protobuf import empty_pb2
from google.protobuf import empty_pb2, wrappers_pb2

import durabletask.internal.helpers as ph
import durabletask.internal.helpers as pbh
Expand Down Expand Up @@ -188,8 +188,8 @@ def stop(self):
def _execute_orchestrator(self, req: pb.OrchestratorRequest, stub: stubs.TaskHubSidecarServiceStub):
try:
executor = _OrchestrationExecutor(self._registry, self._logger)
actions = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions)
actions, custom_status = executor.execute(req.instanceId, req.pastEvents, req.newEvents)
res = pb.OrchestratorResponse(instanceId=req.instanceId, actions=actions, customStatus=wrappers_pb2.StringValue(value=custom_status))
except Exception as ex:
self._logger.exception(f"An error occurred while trying to execute instance '{req.instanceId}': {ex}")
failure_details = pbh.new_failure_details(ex)
Expand Down Expand Up @@ -242,6 +242,7 @@ def __init__(self, instance_id: str):
self._pending_events: Dict[str, List[task.CompletableTask]] = {}
self._new_input: Optional[Any] = None
self._save_events = False
self._custom_status: str = ""

def run(self, generator: Generator[task.Task, Any, Any]):
self._generator = generator
Expand Down Expand Up @@ -355,6 +356,14 @@ def is_replaying(self) -> bool:
def current_utc_datetime(self, value: datetime):
self._current_utc_datetime = value

@property
def custom_status(self) -> str:
return self._custom_status

@custom_status.setter
def custom_status(self, custom_status: str) -> None:
self._custom_status = custom_status

def create_timer(self, fire_at: Union[datetime, timedelta]) -> task.Task:
return self.create_timer_internal(fire_at)

Expand Down Expand Up @@ -466,7 +475,7 @@ def __init__(self, registry: _Registry, logger: logging.Logger):
self._is_suspended = False
self._suspended_events: List[pb.HistoryEvent] = []

def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> List[pb.OrchestratorAction]:
def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_events: Sequence[pb.HistoryEvent]) -> Tuple[List[pb.OrchestratorAction],str]:
if not new_events:
raise task.OrchestrationStateError("The new history event list must have at least one event in it.")

Expand Down Expand Up @@ -501,7 +510,7 @@ def execute(self, instance_id: str, old_events: Sequence[pb.HistoryEvent], new_e
actions = ctx.get_actions()
if self._logger.level <= logging.DEBUG:
self._logger.debug(f"{instance_id}: Returning {len(actions)} action(s): {_get_action_summary(actions)}")
return actions
return actions, ctx.custom_status

def process_event(self, ctx: _RuntimeOrchestrationContext, event: pb.HistoryEvent) -> None:
if self._is_suspended and _is_suspendable(event):
Expand Down
2 changes: 1 addition & 1 deletion submodules/durabletask-protobuf
23 changes: 23 additions & 0 deletions tests/test_orchestration_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,3 +441,26 @@ def throw_activity(ctx: task.ActivityContext, _):
assert state.failure_details.message.endswith("Activity task #1 failed: Kah-BOOOOM!!!")
assert state.failure_details.stack_trace is not None
assert throw_activity_counter == 4

def test_custom_status():

def empty_orchestrator(ctx: task.OrchestrationContext, _):
ctx.custom_status = "foobaz"

# Start a worker, which will connect to the sidecar in a background thread
with worker.TaskHubGrpcWorker() as w:
w.add_orchestrator(empty_orchestrator)
w.start()

c = client.TaskHubGrpcClient()
id = c.schedule_new_orchestration(empty_orchestrator)
state = c.wait_for_orchestration_completion(id, timeout=30)

assert state is not None
assert state.name == task.get_name(empty_orchestrator)
assert state.instance_id == id
assert state.failure_details is None
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
assert state.serialized_input is None
assert state.serialized_output is None
assert state.serialized_custom_status is "foobaz"

0 comments on commit d7e30eb

Please sign in to comment.