Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Workflow cancellation can be swallowed #620

Open
cretz opened this issue Aug 26, 2024 · 1 comment
Open

[Bug] Workflow cancellation can be swallowed #620

cretz opened this issue Aug 26, 2024 · 1 comment
Labels
bug Something isn't working

Comments

@cretz
Copy link
Member

cretz commented Aug 26, 2024

Describe the bug

In Python if a cancellation is requested of a workflow with a short-lived activity, and that activity completes during cancellation (unsure if you have to set cancellation type as wait on the activity), the cancellation is effectively swallowed. This occurs for any Python asyncio where cancellation is used, if the awaiting thing swallows cancellation it's like it never happened.

This is probably just existing, unfortunate Python behavior we have to document. We need to at least confirm if it is only for wait-cancel cancellation types or try-cancel too.

@cretz cretz added the bug Something isn't working label Aug 26, 2024
@lambyqq
Copy link

lambyqq commented Aug 26, 2024

Here's some test code that could reproduce this:

@activity.defn
async def short_activity_async():
   delay = random.uniform(0.05, 0.15)  # 50~150ms delay
   await asyncio.sleep(delay)


@activity.defn
def short_activity_sync():
   delay = random.uniform(0.05, 0.15)  # 50~150ms delay
   sleep(delay)


@workflow.defn
class ShortActivityWorkflow:
   @workflow.run
   async def run(self, total_seconds: float = 10.0):
       end = workflow.now() + timedelta(seconds=total_seconds)
       while True:
           await workflow.execute_activity(
               short_activity_async,
               schedule_to_close_timeout=timedelta(seconds=10))
           await workflow.execute_activity(
               short_activity_sync,
               schedule_to_close_timeout=timedelta(seconds=10))
           if workflow.now() > end:
               break


@pytest.mark.asyncio
async def test_workflow_cancellation():
   ...
   client: Client = ...
   async with Worker(
           client,
           task_queue='test-wf-cancellation-task-queue',
           workflows=[ShortActivityWorkflow],
           activities=[short_activity_async, short_activity_sync],
           activity_executor=ThreadPoolExecutor(max_workers=1)
   ):
       for i in range(10):
           wf_duration = random.uniform(5.0, 15.0)
           wf_handle = await client.start_workflow(
               ShortActivityWorkflow.run,
               id=f'short_activity_wf_id-{i}',
               args=[wf_duration],
               task_queue=test_worker_task_queue,
               execution_timeout=timedelta(minutes=1)
           )

           # Cancel wf
           await asyncio.sleep(1.0)
           await wf_handle.cancel()

           with pytest.raises(WorkflowFailureError) as err_info:
               await wf_handle.result()  # failed
           cause = err_info.value.cause
           assert isinstance(cause, CancelledError)
           assert cause.message == 'Workflow cancelled'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants