Skip to content

Commit 1474b11

Browse files
committed
Bug fix: make endpoint name 1-1 with task_queue
1 parent aa7c3d9 commit 1474b11

File tree

1 file changed

+28
-12
lines changed

1 file changed

+28
-12
lines changed

tests/worker/test_nexus.py

+28-12
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@
2222
from temporalio.service import RPCError, RPCStatusCode
2323
from temporalio.worker import UnsandboxedWorkflowRunner, Worker
2424

25-
NEXUS_ENDPOINT_NAME = "my-nexus-endpoint"
26-
2725

2826
# -----------------------------------------------------------------------------
2927
# Service interface
@@ -124,17 +122,28 @@ class MyCallerWorkflow:
124122
synchronously or asynchronously.
125123
"""
126124

127-
def __init__(self) -> None:
125+
@workflow.init
126+
def __init__(
127+
self,
128+
response_type: ResponseType,
129+
should_cancel: bool,
130+
task_queue: str,
131+
) -> None:
128132
self.nexus_service = workflow.NexusClient(
129133
service=MyService,
130-
endpoint=NEXUS_ENDPOINT_NAME,
134+
endpoint=make_nexus_endpoint_name(task_queue),
131135
schedule_to_close_timeout=timedelta(seconds=10),
132136
)
133137
self._nexus_operation_started = False
134138
self._proceed = False
135139

136140
@workflow.run
137-
async def run(self, response_type: ResponseType, should_cancel: bool) -> str:
141+
async def run(
142+
self,
143+
response_type: ResponseType,
144+
should_cancel: bool,
145+
task_queue: str,
146+
) -> str:
138147
op_handle = await self.nexus_service.start_operation(
139148
MyService.my_operation,
140149
MyInput(
@@ -193,10 +202,10 @@ async def test_sync_response(client: Client, should_attempt_cancel: bool):
193202
task_queue=task_queue,
194203
workflow_runner=UnsandboxedWorkflowRunner(),
195204
):
196-
await create_nexus_endpoint(NEXUS_ENDPOINT_NAME, task_queue, client)
205+
await create_nexus_endpoint(task_queue, client)
197206
wf_handle = await client.start_workflow(
198207
MyCallerWorkflow.run,
199-
args=[SyncResponse(), should_attempt_cancel],
208+
args=[SyncResponse(), should_attempt_cancel, task_queue],
200209
id=str(uuid.uuid4()),
201210
task_queue=task_queue,
202211
)
@@ -217,12 +226,12 @@ async def test_async_response(client: Client):
217226
):
218227
operation_workflow_id = str(uuid.uuid4())
219228
operation_workflow_handle = client.get_workflow_handle(operation_workflow_id)
220-
await create_nexus_endpoint(NEXUS_ENDPOINT_NAME, task_queue, client)
229+
await create_nexus_endpoint(task_queue, client)
221230

222231
# Start the caller workflow
223232
wf_handle = await client.start_workflow(
224233
MyCallerWorkflow.run,
225-
args=[AsyncResponse(operation_workflow_id), False],
234+
args=[AsyncResponse(operation_workflow_id), False, task_queue],
226235
id=str(uuid.uuid4()),
227236
task_queue=task_queue,
228237
)
@@ -256,12 +265,12 @@ async def test_cancellation_of_async_response(client: Client):
256265
):
257266
operation_workflow_id = str(uuid.uuid4())
258267
operation_workflow_handle = client.get_workflow_handle(operation_workflow_id)
259-
await create_nexus_endpoint(NEXUS_ENDPOINT_NAME, task_queue, client)
268+
await create_nexus_endpoint(task_queue, client)
260269

261270
# Start the caller workflow
262271
wf_handle = await client.start_workflow(
263272
MyCallerWorkflow.run,
264-
args=[AsyncResponse(operation_workflow_id), True],
273+
args=[AsyncResponse(operation_workflow_id), True, task_queue],
265274
id=str(uuid.uuid4()),
266275
task_queue=task_queue,
267276
)
@@ -286,7 +295,14 @@ async def test_cancellation_of_async_response(client: Client):
286295
assert wf_details.status == WorkflowExecutionStatus.CANCELED
287296

288297

289-
async def create_nexus_endpoint(name: str, task_queue: str, client: Client) -> None:
298+
def make_nexus_endpoint_name(task_queue: str) -> str:
299+
return f"nexus-endpoint-{task_queue}"
300+
301+
302+
async def create_nexus_endpoint(task_queue: str, client: Client) -> None:
303+
# In order to be able to create endpoints for different task queues without risk of
304+
# name collision.
305+
name = make_nexus_endpoint_name(task_queue)
290306
try:
291307
await client.operator_service.create_nexus_endpoint(
292308
temporalio.api.operatorservice.v1.CreateNexusEndpointRequest(

0 commit comments

Comments
 (0)