Skip to content

Commit 7fe6868

Browse files
committed
Nexus
1 parent 64584f4 commit 7fe6868

19 files changed

+3956
-563
lines changed

README.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,8 @@ informal introduction to the features and their implementation.
9696
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
9797
- [Worker Shutdown](#worker-shutdown)
9898
- [Testing](#testing-1)
99+
- [Nexus](#nexus)
100+
- [hello](#hello)
99101
- [Workflow Replay](#workflow-replay)
100102
- [Observability](#observability)
101103
- [Metrics](#metrics)
@@ -1314,6 +1316,71 @@ affect calls activity code might make to functions on the `temporalio.activity`
13141316
* `cancel()` can be invoked to simulate a cancellation of the activity
13151317
* `worker_shutdown()` can be invoked to simulate a worker shutdown during execution of the activity
13161318

1319+
1320+
### Nexus
1321+
1322+
See [docs.temporal.io/nexus](https://docs.temporal.io/nexus).
1323+
1324+
#### Service Interface Definition
1325+
1326+
A Nexus Service interface definition is a set of named operations, where each operation is an
1327+
`(input_type, output_type)` pair:
1328+
1329+
```python
1330+
@nexusrpc.service
1331+
class MyNexusService:
1332+
my_operation: nexusrpc.Operation[MyOpInput, MyOpOutput]
1333+
```
1334+
1335+
### Operation implementation
1336+
1337+
```python
1338+
@nexusrpc.service(interface.MyNexusService)
1339+
class MyNexusService:
1340+
1341+
@nexusrpc.sync_operation
1342+
def echo(self, input: EchoInput) -> EchoOutput:
1343+
return EchoOutput(message=input.message)
1344+
```
1345+
1346+
```python
1347+
@nexusrpc.service(interface.MyNexusService)
1348+
class MyNexusService:
1349+
1350+
@temporalio.nexus.workflow_operation
1351+
async def hello(
1352+
self, input: HelloInput
1353+
) -> AsyncWorkflowOperationResult[HelloOutput]:
1354+
return await temporalio.nexus.handler.start_workflow(HelloWorkflow.run, input)
1355+
```
1356+
1357+
1358+
### Request options
1359+
1360+
```python
1361+
@dataclass
1362+
class OperationOptions:
1363+
"""Options passed by the Nexus caller when starting an operation."""
1364+
1365+
# A callback URL is required to deliver the completion of an async operation. This URL should be
1366+
# called by a handler upon completion if the started operation is async.
1367+
callback_url: Optional[str] = None
1368+
1369+
# Optional header fields set by the caller to be attached to the callback request when an
1370+
# asynchronous operation completes.
1371+
callback_header: dict[str, str] = field(default_factory=dict)
1372+
1373+
# Request ID that may be used by the server handler to dedupe a start request.
1374+
# By default a v4 UUID will be generated by the client.
1375+
request_id: Optional[str] = None
1376+
1377+
# Links contain arbitrary caller information. Handlers may use these links as
1378+
# metadata on resources associated with an operation.
1379+
links: list[Link] = field(default_factory=list)
1380+
```
1381+
1382+
1383+
13171384
### Workflow Replay
13181385

13191386
Given a workflow's history, it can be replayed locally to check for things like non-determinism errors. For example,

pyproject.toml

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ keywords = [
1111
"workflow",
1212
]
1313
dependencies = [
14+
"hyperlinked",
15+
"nexus-rpc",
16+
"pdbpp>=0.11.6",
1417
"protobuf>=3.20",
1518
"python-dateutil>=2.8.2,<3 ; python_version < '3.11'",
19+
"temporalio-xray",
1620
"types-protobuf>=3.20",
1721
"typing-extensions>=4.2.0,<5",
1822
]
@@ -40,7 +44,7 @@ dev = [
4044
"psutil>=5.9.3,<6",
4145
"pydocstyle>=6.3.0,<7",
4246
"pydoctor>=24.11.1,<25",
43-
"pyright==1.1.377",
47+
"pyright==1.1.400",
4448
"pytest~=7.4",
4549
"pytest-asyncio>=0.21,<0.22",
4650
"pytest-timeout~=2.2",
@@ -49,6 +53,7 @@ dev = [
4953
"twine>=4.0.1,<5",
5054
"ruff>=0.5.0,<0.6",
5155
"maturin>=1.8.2",
56+
"pytest-cov>=6.1.1",
5257
]
5358

5459
[tool.poe.tasks]
@@ -60,8 +65,8 @@ gen-protos = "uv run python scripts/gen_protos.py"
6065
lint = [
6166
{cmd = "uv run ruff check --select I"},
6267
{cmd = "uv run ruff format --check"},
63-
{ref = "lint-types"},
6468
{cmd = "uv run pyright"},
69+
{ref = "lint-types"},
6570
{ref = "lint-docs"},
6671
]
6772
bridge-lint = { cmd = "cargo clippy -- -D warnings", cwd = "temporalio/bridge" }
@@ -70,7 +75,7 @@ bridge-lint = { cmd = "cargo clippy -- -D warnings", cwd = "temporalio/bridge" }
7075
lint-docs = "uv run pydocstyle --ignore-decorators=overload"
7176
lint-types = "uv run mypy --namespace-packages --check-untyped-defs ."
7277
run-bench = "uv run python scripts/run_bench.py"
73-
test = "uv run pytest"
78+
test = "uv run pytest --cov temporalio --cov-report xml"
7479

7580

7681
[tool.pytest.ini_options]
@@ -83,8 +88,6 @@ testpaths = ["tests"]
8388
timeout = 600
8489
timeout_func_only = true
8590
filterwarnings = [
86-
"error::temporalio.workflow.UnfinishedUpdateHandlersWarning",
87-
"error::temporalio.workflow.UnfinishedSignalHandlersWarning",
8891
"ignore::pytest.PytestDeprecationWarning",
8992
"ignore::DeprecationWarning",
9093
]
@@ -157,6 +160,7 @@ exclude = [
157160
"tests/worker/workflow_sandbox/testmodules/proto",
158161
"temporalio/bridge/worker.py",
159162
"temporalio/contrib/opentelemetry.py",
163+
"temporalio/contrib/pydantic.py",
160164
"temporalio/converter.py",
161165
"temporalio/testing/_workflow.py",
162166
"temporalio/worker/_activity.py",
@@ -168,6 +172,10 @@ exclude = [
168172
"tests/api/test_grpc_stub.py",
169173
"tests/conftest.py",
170174
"tests/contrib/test_opentelemetry.py",
175+
"tests/contrib/pydantic/models.py",
176+
"tests/contrib/pydantic/models_2.py",
177+
"tests/contrib/pydantic/test_pydantic.py",
178+
"tests/contrib/pydantic/workflows.py",
171179
"tests/test_converter.py",
172180
"tests/test_service.py",
173181
"tests/test_workflow.py",
@@ -186,6 +194,9 @@ exclude = [
186194
[tool.ruff]
187195
target-version = "py39"
188196

197+
[tool.ruff.lint]
198+
extend-ignore = ["E741"] # Allow single-letter variable names like I, O
199+
189200
[build-system]
190201
requires = ["maturin>=1.0,<2.0"]
191202
build-backend = "maturin"
@@ -202,3 +213,8 @@ exclude = [
202213
[tool.uv]
203214
# Prevent uv commands from building the package by default
204215
package = false
216+
217+
[tool.uv.sources]
218+
nexus-rpc = { path = "../nexus-sdk-python", editable = true }
219+
temporalio-xray = { path = "../xray/sdks/python", editable = true }
220+
hyperlinked = { path = "../../hyperlinked/python", editable = true }

temporalio/bridge/src/worker.rs

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use temporal_sdk_core_api::worker::{
2020
};
2121
use temporal_sdk_core_api::Worker;
2222
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
23-
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion};
23+
use temporal_sdk_core_protos::coresdk::{ActivityHeartbeat, ActivityTaskCompletion, nexus::NexusTaskCompletion};
2424
use temporal_sdk_core_protos::temporal::api::history::v1::History;
2525
use tokio::sync::mpsc::{channel, Sender};
2626
use tokio_stream::wrappers::ReceiverStream;
@@ -570,6 +570,19 @@ impl WorkerRef {
570570
})
571571
}
572572

573+
fn poll_nexus_task<'p>(&self, py: Python<'p>) -> PyResult<&'p PyAny> {
574+
let worker = self.worker.as_ref().unwrap().clone();
575+
self.runtime.future_into_py(py, async move {
576+
let bytes = match worker.poll_nexus_task().await {
577+
Ok(task) => task.encode_to_vec(),
578+
Err(PollError::ShutDown) => return Err(PollShutdownError::new_err(())),
579+
Err(err) => return Err(PyRuntimeError::new_err(format!("Poll failure: {}", err))),
580+
};
581+
let bytes: &[u8] = &bytes;
582+
Ok(Python::with_gil(|py| bytes.into_py(py)))
583+
})
584+
}
585+
573586
fn complete_workflow_activation<'p>(
574587
&self,
575588
py: Python<'p>,
@@ -600,6 +613,19 @@ impl WorkerRef {
600613
})
601614
}
602615

616+
fn complete_nexus_task<'p>(&self, py: Python<'p>, proto: &PyBytes) -> PyResult<&'p PyAny> {
617+
let worker = self.worker.as_ref().unwrap().clone();
618+
let completion = NexusTaskCompletion::decode(proto.as_bytes())
619+
.map_err(|err| PyValueError::new_err(format!("Invalid proto: {}", err)))?;
620+
self.runtime.future_into_py(py, async move {
621+
worker
622+
.complete_nexus_task(completion)
623+
.await
624+
.context("Completion failure")
625+
.map_err(Into::into)
626+
})
627+
}
628+
603629
fn record_activity_heartbeat(&self, proto: &PyBytes) -> PyResult<()> {
604630
enter_sync!(self.runtime);
605631
let heartbeat = ActivityHeartbeat::decode(proto.as_bytes())

temporalio/bridge/worker.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import temporalio.bridge.client
2727
import temporalio.bridge.proto
2828
import temporalio.bridge.proto.activity_task
29+
import temporalio.bridge.proto.nexus
2930
import temporalio.bridge.proto.workflow_activation
3031
import temporalio.bridge.proto.workflow_completion
3132
import temporalio.bridge.runtime
@@ -35,7 +36,7 @@
3536
from temporalio.bridge.temporal_sdk_bridge import (
3637
CustomSlotSupplier as BridgeCustomSlotSupplier,
3738
)
38-
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError
39+
from temporalio.bridge.temporal_sdk_bridge import PollShutdownError # type: ignore
3940

4041

4142
@dataclass
@@ -216,6 +217,14 @@ async def poll_activity_task(
216217
await self._ref.poll_activity_task()
217218
)
218219

220+
async def poll_nexus_task(
221+
self,
222+
) -> temporalio.bridge.proto.nexus.NexusTask:
223+
"""Poll for a nexus task."""
224+
return temporalio.bridge.proto.nexus.NexusTask.FromString(
225+
await self._ref.poll_nexus_task()
226+
)
227+
219228
async def complete_workflow_activation(
220229
self,
221230
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
@@ -229,6 +238,12 @@ async def complete_activity_task(
229238
"""Complete an activity task."""
230239
await self._ref.complete_activity_task(comp.SerializeToString())
231240

241+
async def complete_nexus_task(
242+
self, comp: temporalio.bridge.proto.nexus.NexusTaskCompletion
243+
) -> None:
244+
"""Complete a nexus task."""
245+
await self._ref.complete_nexus_task(comp.SerializeToString())
246+
232247
def record_activity_heartbeat(
233248
self, comp: temporalio.bridge.proto.ActivityHeartbeat
234249
) -> None:

temporalio/client.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,8 +459,15 @@ async def start_workflow(
459459
rpc_metadata: Mapping[str, str] = {},
460460
rpc_timeout: Optional[timedelta] = None,
461461
request_eager_start: bool = False,
462-
stack_level: int = 2,
463462
priority: temporalio.common.Priority = temporalio.common.Priority.default,
463+
# The following options are deliberately not exposed in overloads
464+
stack_level: int = 2,
465+
nexus_completion_callbacks: Sequence[
466+
temporalio.common.NexusCompletionCallback
467+
] = [],
468+
workflow_event_links: Sequence[
469+
temporalio.api.common.v1.Link.WorkflowEvent
470+
] = [],
464471
) -> WorkflowHandle[Any, Any]:
465472
"""Start a workflow and return its handle.
466473
@@ -523,6 +530,11 @@ async def start_workflow(
523530
temporalio.workflow._Definition.get_name_and_result_type(workflow)
524531
)
525532

533+
for l in workflow_event_links:
534+
print(
535+
f"🌈@@ worker starting workflow with link: {google.protobuf.json_format.MessageToJson(l)}"
536+
)
537+
526538
return await self._impl.start_workflow(
527539
StartWorkflowInput(
528540
workflow=name,
@@ -549,6 +561,8 @@ async def start_workflow(
549561
rpc_timeout=rpc_timeout,
550562
request_eager_start=request_eager_start,
551563
priority=priority,
564+
nexus_completion_callbacks=nexus_completion_callbacks,
565+
workflow_event_links=workflow_event_links,
552566
)
553567
)
554568

@@ -5156,6 +5170,8 @@ class StartWorkflowInput:
51565170
rpc_timeout: Optional[timedelta]
51575171
request_eager_start: bool
51585172
priority: temporalio.common.Priority
5173+
nexus_completion_callbacks: Sequence[temporalio.common.NexusCompletionCallback]
5174+
workflow_event_links: Sequence[temporalio.api.common.v1.Link.WorkflowEvent]
51595175

51605176

51615177
@dataclass
@@ -5770,6 +5786,16 @@ async def _build_start_workflow_execution_request(
57705786
req = temporalio.api.workflowservice.v1.StartWorkflowExecutionRequest()
57715787
req.request_eager_execution = input.request_eager_start
57725788
await self._populate_start_workflow_execution_request(req, input)
5789+
for callback in input.nexus_completion_callbacks:
5790+
c = temporalio.api.common.v1.Callback()
5791+
c.nexus.url = callback.url
5792+
c.nexus.header.update(callback.header)
5793+
req.completion_callbacks.append(c)
5794+
5795+
req.links.extend(
5796+
temporalio.api.common.v1.Link(workflow_event=link)
5797+
for link in input.workflow_event_links
5798+
)
57735799
return req
57745800

57755801
async def _build_signal_with_start_workflow_execution_request(

temporalio/common.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from abc import ABC, abstractmethod
99
from dataclasses import dataclass
1010
from datetime import datetime, timedelta
11-
from enum import Enum, IntEnum
11+
from enum import IntEnum
1212
from typing import (
1313
Any,
1414
Callable,
@@ -195,6 +195,37 @@ def __setstate__(self, state: object) -> None:
195195
)
196196

197197

198+
@dataclass(frozen=True)
199+
class NexusCompletionCallback:
200+
"""Nexus callback to attach to events such as workflow completion."""
201+
202+
url: str
203+
"""Callback URL."""
204+
205+
header: Mapping[str, str]
206+
"""Header to attach to callback request."""
207+
208+
209+
@dataclass(frozen=True)
210+
class WorkflowEventLink:
211+
"""A link to a history event that can be attached to a different history event."""
212+
213+
namespace: str
214+
"""Namespace of the workflow to link to."""
215+
216+
workflow_id: str
217+
"""ID of the workflow to link to."""
218+
219+
run_id: str
220+
"""Run ID of the workflow to link to."""
221+
222+
event_type: temporalio.api.enums.v1.EventType
223+
"""Type of the event to link to."""
224+
225+
event_id: int
226+
"""ID of the event to link to."""
227+
228+
198229
# We choose to make this a list instead of an sequence so we can catch if people
199230
# are not sending lists each time but maybe accidentally sending a string (which
200231
# is a sequence)

0 commit comments

Comments
 (0)