@@ -297,43 +297,9 @@ async def test_async_response(
297
297
task_queue = task_queue ,
298
298
workflow_runner = UnsandboxedWorkflowRunner (),
299
299
):
300
- await create_nexus_endpoint (task_queue , client )
301
- operation_workflow_id = "default-workflow-id"
302
-
303
- # Start the caller workflow and wait until it confirms the Nexus operation has started.
304
- block_forever_waiting_for_cancellation = request_cancel
305
- start_op = WithStartWorkflowOperation (
306
- MyCallerWorkflow .run ,
307
- args = [
308
- MyInput (
309
- response_type = AsyncResponse (
310
- operation_workflow_id ,
311
- block_forever_waiting_for_cancellation ,
312
- use_shorthand_defined_operation ,
313
- ),
314
- start_options = nexusrpc .handler .StartOperationOptions (
315
- headers = {"my-header-key" : "my-header-value" },
316
- ),
317
- ),
318
- request_cancel ,
319
- task_queue ,
320
- ],
321
- id = str (uuid .uuid4 ()),
322
- task_queue = task_queue ,
323
- id_conflict_policy = WorkflowIDConflictPolicy .FAIL ,
300
+ caller_wf_handle , handler_wf_handle = await _start_wf_and_nexus_op (
301
+ client , task_queue , request_cancel , use_shorthand_defined_operation
324
302
)
325
-
326
- await client .execute_update_with_start_workflow (
327
- MyCallerWorkflow .wait_nexus_operation_started ,
328
- start_workflow_operation = start_op ,
329
- )
330
- caller_wf_handle = await start_op .workflow_handle ()
331
-
332
- # check that the operation-backing workflow now exists, and that (a) the handler
333
- # workflow accepted the link to the calling Nexus event, and that (b) the caller
334
- # workflow NexusOperationStarted event received in return a link to the
335
- # operation-backing workflow.
336
- handler_wf_handle = client .get_workflow_handle (operation_workflow_id )
337
303
# TODO(dan): race here? How do we know it hasn't been canceled already?
338
304
handler_wf_info = await handler_wf_handle .describe ()
339
305
assert handler_wf_info .status in [
@@ -379,6 +345,55 @@ async def test_async_response(
379
345
assert result .start_options_received_by_handler
380
346
381
347
348
+ async def _start_wf_and_nexus_op (
349
+ client : Client ,
350
+ task_queue : str ,
351
+ request_cancel : bool ,
352
+ use_shorthand_defined_operation : bool ,
353
+ ) -> tuple [WorkflowHandle , WorkflowHandle ]:
354
+ """
355
+ Start the caller workflow and wait until the Nexus operation has started.
356
+ """
357
+ await create_nexus_endpoint (task_queue , client )
358
+ operation_workflow_id = "default-workflow-id"
359
+
360
+ # Start the caller workflow and wait until it confirms the Nexus operation has started.
361
+ block_forever_waiting_for_cancellation = request_cancel
362
+ start_op = WithStartWorkflowOperation (
363
+ MyCallerWorkflow .run ,
364
+ args = [
365
+ MyInput (
366
+ response_type = AsyncResponse (
367
+ operation_workflow_id ,
368
+ block_forever_waiting_for_cancellation ,
369
+ use_shorthand_defined_operation ,
370
+ ),
371
+ start_options = nexusrpc .handler .StartOperationOptions (
372
+ headers = {"my-header-key" : "my-header-value" },
373
+ ),
374
+ ),
375
+ request_cancel ,
376
+ task_queue ,
377
+ ],
378
+ id = str (uuid .uuid4 ()),
379
+ task_queue = task_queue ,
380
+ id_conflict_policy = WorkflowIDConflictPolicy .FAIL ,
381
+ )
382
+
383
+ await client .execute_update_with_start_workflow (
384
+ MyCallerWorkflow .wait_nexus_operation_started ,
385
+ start_workflow_operation = start_op ,
386
+ )
387
+ caller_wf_handle = await start_op .workflow_handle ()
388
+
389
+ # check that the operation-backing workflow now exists, and that (a) the handler
390
+ # workflow accepted the link to the calling Nexus event, and that (b) the caller
391
+ # workflow NexusOperationStarted event received in return a link to the
392
+ # operation-backing workflow.
393
+ handler_wf_handle = client .get_workflow_handle (operation_workflow_id )
394
+ return caller_wf_handle , handler_wf_handle
395
+
396
+
382
397
def make_nexus_endpoint_name (task_queue : str ) -> str :
383
398
# Create endpoints for different task queues without name collisions.
384
399
return f"nexus-endpoint-{ task_queue } "
0 commit comments