Skip to content

Commit f71defd

Browse files
committed
Clean up existing code
1 parent 8449a35 commit f71defd

File tree

3 files changed

+22
-21
lines changed

3 files changed

+22
-21
lines changed

temporalio/worker/_interceptor.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -388,23 +388,23 @@ async def signal_external_workflow(
388388

389389
def start_activity(
390390
self, input: StartActivityInput
391-
) -> temporalio.workflow.ActivityHandle:
391+
) -> temporalio.workflow.ActivityHandle[Any]:
392392
"""Called for every :py:func:`temporalio.workflow.start_activity` and
393393
:py:func:`temporalio.workflow.execute_activity` call.
394394
"""
395395
return self.next.start_activity(input)
396396

397397
async def start_child_workflow(
398398
self, input: StartChildWorkflowInput
399-
) -> temporalio.workflow.ChildWorkflowHandle:
399+
) -> temporalio.workflow.ChildWorkflowHandle[Any, Any]:
400400
"""Called for every :py:func:`temporalio.workflow.start_child_workflow`
401401
and :py:func:`temporalio.workflow.execute_child_workflow` call.
402402
"""
403403
return await self.next.start_child_workflow(input)
404404

405405
def start_local_activity(
406406
self, input: StartLocalActivityInput
407-
) -> temporalio.workflow.ActivityHandle:
407+
) -> temporalio.workflow.ActivityHandle[Any]:
408408
"""Called for every :py:func:`temporalio.workflow.start_local_activity`
409409
and :py:func:`temporalio.workflow.execute_local_activity` call.
410410
"""

temporalio/worker/_workflow_instance.py

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1584,25 +1584,26 @@ def _outbound_schedule_activity(
15841584
self,
15851585
input: Union[StartActivityInput, StartLocalActivityInput],
15861586
) -> _ActivityHandle:
1587+
# A ScheduleActivityTask command always results in an ActivityTaskScheduled event,
1588+
# so this function returns the handle immediately. This is similar to nexus
1589+
# operation but differs from child workflow.
1590+
15871591
# Validate
15881592
if not input.start_to_close_timeout and not input.schedule_to_close_timeout:
15891593
raise ValueError(
15901594
"Activity must have start_to_close_timeout or schedule_to_close_timeout"
15911595
)
15921596

1593-
handle: Optional[_ActivityHandle] = None
1597+
handle: _ActivityHandle
15941598

15951599
# Function that runs in the handle
15961600
async def run_activity() -> Any:
1597-
nonlocal handle
1598-
assert handle
15991601
while True:
16001602
# Mark it as started each loop because backoff could cause it to
16011603
# be marked as unstarted
16021604
handle._started = True
16031605
try:
1604-
# We have to shield because we don't want the underlying
1605-
# result future to be cancelled
1606+
# Shield so that future itself is not cancelled
16061607
return await asyncio.shield(handle._result_fut)
16071608
except _ActivityDoBackoffError as err:
16081609
# We have to sleep then reschedule. Note this sleep can be
@@ -1662,12 +1663,16 @@ async def _outbound_signal_external_workflow(
16621663
async def _outbound_start_child_workflow(
16631664
self, input: StartChildWorkflowInput
16641665
) -> _ChildWorkflowHandle:
1665-
handle: Optional[_ChildWorkflowHandle] = None
1666+
# A StartChildWorkflowExecution command results in a
1667+
# StartChildWorkflowExecutionInitiated event, but the start may fail (e.g. due to
1668+
# workflow ID collision). Therefore this function does not return the handle until
1669+
# a future activation contains an event indicating start success / failure. This
1670+
# differs from activity and nexus operation.
1671+
1672+
handle: _ChildWorkflowHandle
16661673

16671674
# Common code for handling cancel for start and run
16681675
def apply_child_cancel_error() -> None:
1669-
nonlocal handle
1670-
assert handle
16711676
# Send a cancel request to the child
16721677
cancel_command = self._add_command()
16731678
handle._apply_cancel_command(cancel_command)
@@ -1685,12 +1690,9 @@ def apply_child_cancel_error() -> None:
16851690

16861691
# Function that runs in the handle
16871692
async def run_child() -> Any:
1688-
nonlocal handle
16891693
while True:
1690-
assert handle
16911694
try:
1692-
# We have to shield because we don't want the future itself
1693-
# to be cancelled
1695+
# Shield so that future itself is not cancelled
16941696
return await asyncio.shield(handle._result_fut)
16951697
except asyncio.CancelledError:
16961698
apply_child_cancel_error()
@@ -1705,8 +1707,7 @@ async def run_child() -> Any:
17051707
# Wait on start before returning
17061708
while True:
17071709
try:
1708-
# We have to shield because we don't want the future itself
1709-
# to be cancelled
1710+
# Shield so that future itself is not cancelled
17101711
await asyncio.shield(handle._start_fut)
17111712
return handle
17121713
except asyncio.CancelledError:
@@ -2438,17 +2439,17 @@ async def signal_external_workflow(
24382439

24392440
def start_activity(
24402441
self, input: StartActivityInput
2441-
) -> temporalio.workflow.ActivityHandle:
2442+
) -> temporalio.workflow.ActivityHandle[Any]:
24422443
return self._instance._outbound_schedule_activity(input)
24432444

24442445
async def start_child_workflow(
24452446
self, input: StartChildWorkflowInput
2446-
) -> temporalio.workflow.ChildWorkflowHandle:
2447+
) -> temporalio.workflow.ChildWorkflowHandle[Any, Any]:
24472448
return await self._instance._outbound_start_child_workflow(input)
24482449

24492450
def start_local_activity(
24502451
self, input: StartLocalActivityInput
2451-
) -> temporalio.workflow.ActivityHandle:
2452+
) -> temporalio.workflow.ActivityHandle[Any]:
24522453
return self._instance._outbound_schedule_activity(input)
24532454

24542455

tests/helpers/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import uuid
55
from contextlib import closing
66
from datetime import timedelta
7-
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar, Union
7+
from typing import Any, Awaitable, Callable, Optional, Sequence, Type, TypeVar
88

99
from temporalio.api.common.v1 import WorkflowExecution
1010
from temporalio.api.enums.v1 import IndexedValueType

0 commit comments

Comments
 (0)