|
35 | 35 | import temporalio.nexus
|
36 | 36 | from temporalio import workflow
|
37 | 37 | from temporalio.client import Client, WorkflowHandle
|
| 38 | +from temporalio.common import WorkflowIDReusePolicy |
38 | 39 | from temporalio.converter import FailureConverter, PayloadConverter
|
39 | 40 | from temporalio.exceptions import ApplicationError
|
40 | 41 | from temporalio.nexus import CancelOperationContext, StartOperationContext, logger
|
@@ -202,6 +203,7 @@ async def workflow_run_operation(
|
202 | 203 | input,
|
203 | 204 | id=test_context.workflow_id or str(uuid.uuid4()),
|
204 | 205 | task_queue=ctx.task_queue,
|
| 206 | + id_reuse_policy=WorkflowIDReusePolicy.REJECT_DUPLICATE, |
205 | 207 | )
|
206 | 208 |
|
207 | 209 | @nexusrpc.handler.sync_operation_handler
|
@@ -965,6 +967,64 @@ async def test_request_id_is_received_by_sync_operation_handler(
|
965 | 967 | assert resp.json() == {"value": f"request_id: {request_id}"}
|
966 | 968 |
|
967 | 969 |
|
| 970 | +async def test_request_id_becomes_start_workflow_request_id(env: WorkflowEnvironment): |
| 971 | + # We send two Nexus requests that would start a workflow with the same workflow ID, |
| 972 | + # using reuse_policy=REJECT_DUPLICATE. This would fail if they used different |
| 973 | + # request IDs. However, when we use the same request ID, it does not fail, |
| 974 | + # demonstrating that the Nexus Start Operation request ID has become the |
| 975 | + # StartWorkflow request ID. |
| 976 | + task_queue = str(uuid.uuid4()) |
| 977 | + endpoint = (await create_nexus_endpoint(task_queue, env.client)).endpoint.id |
| 978 | + service_client = ServiceClient( |
| 979 | + server_address=server_address(env), |
| 980 | + endpoint=endpoint, |
| 981 | + service=MyService.__name__, |
| 982 | + ) |
| 983 | + |
| 984 | + decorator = nexusrpc.handler.service_handler(service=MyService) |
| 985 | + service_handler = decorator(MyServiceHandler)() |
| 986 | + |
| 987 | + async def start_two_workflows_with_conflicting_workflow_ids( |
| 988 | + request_ids: tuple[tuple[str, int], tuple[str, int]], |
| 989 | + ): |
| 990 | + test_context.workflow_id = str(uuid.uuid4()) |
| 991 | + for request_id, status_code in request_ids: |
| 992 | + resp = await service_client.start_operation( |
| 993 | + "workflow_run_operation", |
| 994 | + dataclass_as_dict(Input("")), |
| 995 | + {"Nexus-Request-Id": request_id}, |
| 996 | + ) |
| 997 | + assert resp.status_code == status_code, ( |
| 998 | + f"expected status code {status_code} " |
| 999 | + f"but got {resp.status_code} for response content " |
| 1000 | + f"{pprint.pformat(resp.content.decode())}" |
| 1001 | + ) |
| 1002 | + if status_code == 201: |
| 1003 | + assert resp.json()["token"] |
| 1004 | + assert ( |
| 1005 | + resp.json()["state"] |
| 1006 | + == nexusrpc.handler.OperationState.RUNNING.value |
| 1007 | + ) |
| 1008 | + |
| 1009 | + async with Worker( |
| 1010 | + env.client, |
| 1011 | + task_queue=task_queue, |
| 1012 | + nexus_services=[service_handler], |
| 1013 | + nexus_task_executor=concurrent.futures.ThreadPoolExecutor(), |
| 1014 | + ): |
| 1015 | + request_id_1, request_id_2 = str(uuid.uuid4()), str(uuid.uuid4()) |
| 1016 | + # Reusing the same request ID does not fail |
| 1017 | + await start_two_workflows_with_conflicting_workflow_ids( |
| 1018 | + ((request_id_1, 201), (request_id_1, 201)) |
| 1019 | + ) |
| 1020 | + # Using a different request ID does fail |
| 1021 | + # TODO(nexus-prerelease) I think that this should be a 409 per the spec. Go and |
| 1022 | + # Java are not doing that. |
| 1023 | + await start_two_workflows_with_conflicting_workflow_ids( |
| 1024 | + ((request_id_1, 201), (request_id_2, 500)) |
| 1025 | + ) |
| 1026 | + |
| 1027 | + |
968 | 1028 | def server_address(env: WorkflowEnvironment) -> str:
|
969 | 1029 | http_port = getattr(env, "_http_port", 7243)
|
970 | 1030 | return f"http://127.0.0.1:{http_port}"
|
0 commit comments