Skip to content

Commit f529ddf

Browse files
committed
Cleanup
1 parent f986597 commit f529ddf

File tree

5 files changed

+15
-13
lines changed

5 files changed

+15
-13
lines changed

temporalio/nexus/handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,8 @@ def factory(
264264
f"expected {start_method} to be a function or callable instance."
265265
)
266266

267-
factory.__nexus_operation__ = nexusrpc.Operation._create(
268-
name=name,
267+
factory.__nexus_operation__ = nexusrpc.Operation(
268+
name=name or method_name,
269269
method_name=method_name,
270270
input_type=input_type,
271271
output_type=output_type,

temporalio/worker/_interceptor.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,7 @@ def __post_init__(self) -> None:
334334
def operation_name(self) -> str:
335335
return self._operation_name
336336

337+
# TODO(nexus-prerelease) contravariant type in output
337338
@property
338339
def input_type(self) -> Optional[Type[InputT]]:
339340
return self._input_type

temporalio/worker/_nexus.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ async def _handle_cancel_operation_task(
178178
temporalio.nexus.current_context.set(
179179
temporalio.nexus.Context(operation_context=ctx)
180180
)
181-
# TODO(nexus-prerelease): header
181+
# TODO(nexus-prerelease): headers
182182
try:
183183
await self._handler.cancel_operation(ctx, request.operation_token)
184184
except Exception as err:
@@ -219,6 +219,7 @@ async def _handle_start_operation_task(
219219

220220
try:
221221
start_response = await self._start_operation(start_request, headers)
222+
# TODO(nexus-prerelease): handle BrokenExecutor by failing the worker
222223
except BaseException as err:
223224
handler_err = _exception_to_handler_error(err)
224225
completion = temporalio.bridge.proto.nexus.NexusTaskCompletion(
@@ -235,7 +236,6 @@ async def _handle_start_operation_task(
235236

236237
try:
237238
await self._bridge_worker().complete_nexus_task(completion)
238-
# TODO(nexus-prerelease): handle BrokenExecutor by failing the worker
239239
except Exception:
240240
temporalio.nexus.logger.exception("Failed to send Nexus task completion")
241241
finally:
@@ -290,8 +290,8 @@ async def _start_operation(
290290
async_success=temporalio.api.nexus.v1.StartOperationResponse.Async(
291291
operation_token=result.token,
292292
links=[
293-
temporalio.api.nexus.v1.Link(url=l.url, type=l.type)
294-
for l in ctx.outbound_links
293+
temporalio.api.nexus.v1.Link(url=link.url, type=link.type)
294+
for link in ctx.outbound_links
295295
],
296296
)
297297
)
@@ -305,8 +305,9 @@ async def _start_operation(
305305
else:
306306
raise _exception_to_handler_error(
307307
TypeError(
308-
"Operation start method must return either nexusrpc.handler.StartOperationResultSync "
309-
"or nexusrpc.handler.StartOperationResultAsync"
308+
"Operation start method must return either "
309+
"nexusrpc.handler.StartOperationResultSync or "
310+
"nexusrpc.handler.StartOperationResultAsync."
310311
)
311312
)
312313
except nexusrpc.handler.OperationError as err:
@@ -321,7 +322,6 @@ async def _exception_to_failure_proto(
321322
api_failure = temporalio.api.failure.v1.Failure()
322323
await self._data_converter.encode_failure(err, api_failure)
323324
api_failure = google.protobuf.json_format.MessageToDict(api_failure)
324-
# TODO(nexus-prerelease): is metadata correct and playing intended role here?
325325
return temporalio.api.nexus.v1.Failure(
326326
message=api_failure.pop("message", ""),
327327
metadata={"type": "temporal.api.failure.v1.Failure"},
@@ -375,14 +375,14 @@ async def deserialize(
375375
[self.payload],
376376
type_hints=[as_type] if as_type else None,
377377
)
378+
return input
378379
except Exception as err:
379380
raise nexusrpc.handler.HandlerError(
380381
"Data converter failed to decode Nexus operation input",
381382
type=nexusrpc.handler.HandlerErrorType.BAD_REQUEST,
382383
cause=err,
383384
retryable=False,
384385
) from err
385-
return input
386386

387387

388388
# TODO(nexus-prerelease): tests for this function

tests/nexus/test_handler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ class OperationHandlerReturningUnwrappedResultError(_FailureTestCase):
573573
retryable_header=False,
574574
failure_message=(
575575
"Operation start method must return either "
576-
"nexusrpc.handler.StartOperationResultSync or nexusrpc.handler.StartOperationResultAsync"
576+
"nexusrpc.handler.StartOperationResultSync or nexusrpc.handler.StartOperationResultAsync."
577577
),
578578
)
579579

tests/nexus/test_handler_operation_definitions.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ async def workflow_run_operation_handler(
3838
) -> WorkflowHandle[Any, Output]: ...
3939

4040
expected_operations = {
41-
"workflow_run_operation_handler": nexusrpc.Operation._create(
41+
"workflow_run_operation_handler": nexusrpc.Operation(
42+
name="workflow_run_operation_handler",
4243
method_name="workflow_run_operation_handler",
4344
input_type=Input,
4445
output_type=Output,
@@ -66,7 +67,7 @@ async def workflow_run_operation_with_name_override(
6667
) -> WorkflowHandle[Any, Output]: ...
6768

6869
expected_operations = {
69-
"workflow_run_operation_with_name_override": nexusrpc.Operation._create(
70+
"workflow_run_operation_with_name_override": nexusrpc.Operation(
7071
name="operation-name",
7172
method_name="workflow_run_operation_with_name_override",
7273
input_type=Input,

0 commit comments

Comments
 (0)