Skip to content

Commit

Permalink
feat(subs): child workflows + docs (#867)
Browse files Browse the repository at this point in the history
Support for child workflows.

Solves MET-689 and MET-668.

#### Migration notes

Previously
```python
    sub = SubstantialRuntime(backend)
    hello = sub.deno(file="workflow.ts", name="sayHello", deps=[])

    g.expose(
      # each function start, stop, result, ... holds a copy of the workflow data
       start_hello = hello.start(...),
       stop_hello = hello.stop()
    )
```
This approach relied on workflow files being referenced in each
materializer, but the constructs were too restrictive to support
something like `mutation { results(name: "nameManuallyGiven") }`.

We now have instead
```python
    file = (
       WorkflowFile
           .deno(file="workflow.ts", deps=[])
           .import_(["sayHello"])
           .build()
    )

    # workflow data are refered only once
    sub = SubstantialRuntime(backend, [file])
    g.expose(
      start_hello = sub.start(...).reduce({ "name": "sayHello" }),
      stop = sub.stop()
    )
```

- [x] The change comes with new or modified tests
- [ ] Hard-to-understand functions have explanatory comments
- [ ] End-user documentation is updated to reflect the change
  • Loading branch information
michael-0acf4 authored Oct 18, 2024
1 parent defae4c commit 1a2b58d
Show file tree
Hide file tree
Showing 41 changed files with 1,858 additions and 539 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

227 changes: 227 additions & 0 deletions docs/metatype.dev/docs/reference/runtimes/substantial/index.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import SDKTabs from "@site/src/components/SDKTabs";
import TabItem from "@theme/TabItem";

# Substantial

## Substantial runtime

The Substantial runtime enables the execution of durable workflows in one or accross multiple typegates.

Why use it?

- **Long-running "processes"**: Durable tasks that need to run over extended periods (days, weeks or months), handling **retries** and **restarts** seamlessly.
- **Fault-tolerant execution**: Ensure reliable execution of tasks, even upon failures, by maintaining a durable state of the latest run.
- **Task orchestration**: Coordinate complex sequences of workflows (analogous to microservices interactions).

For example, the workflow bellow will continue running until a `confirmation` event is sent to the **associated run**.

```typescript
export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;

// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date().toJSON());

const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}

return `${to} confirmed (${info})`;
}

```

Additionally, if we were to shut down the Typegate node executing it and then restart it, the state **will be preserved**. This means that if the subscription email was already sent, upon relaunch, it will not be sent again, same thing for the value of `timeSent`.

## Key Concepts

### Workflows

A special type of function with **durable state** and an execution mechanism directly tied to time. A workflow can also trigger other workflows (child workflows).

### Backend

This abstraction implements a set of atomic operations that allows Typegate to persist and recover the workflow state. Currently, we have the **Redis** backend available, along with others like **fs** and **memory**, which are primarily intended for development or testing purposes.

### Run

When a workflow is started, a run is created and Substantial will provide you a `run_id` to uniquely identify it.

You can send an event or abort an ongoing run from its `run_id`.


## Child workflows

Child workflows are like any other workflows, they are just run by another workflow (parent).

If a workflow parent is explicitly stopped or aborted, all of its descendants will also be aborted.

For example, suppose you want to write a workflow that sends a subscription request to a list of emails and then receive a notification for each confirmation or denial, but only during your work hours.

You can easily translate that logic as if you were writing generic sequential code using Substantial workflows.

```typescript
import { nextTimeWhenAdminIsAvailable, sendSubscriptionEmail, notifyAdmin } from "./utils.ts";

export async function sendEmail(ctx: Context) {
// 1. A workflow can receive parameters whose type is defined on the typegraph
const { to } = ctx.kwargs;

// 2. When a function call produces effects, we can make it durable
const info = await ctx.save(() => sendSubscriptionEmail(to));
const timeSent = await ctx.save(() => new Date());

const confirmation = ctx.receive<boolean>("confirmation");
if (!confirmation) {
throw new Error(`${to} has denied the subscription sent at ${timeSent}`);
}

// 3. In this scenario, we use a durable sleep to wait until the admin
// is available
const duration = await ctx.save(() => nextTimeWhenAdminIsAvailable(new Date()));
ctx.sleep(duration);

const _ = await ctx.save(() => notifyAdmin(info), {
retry: {
minBackoffMs: 1000,
maxBackoffMs: 5000,
maxRetries: 4,
}
});

return `${to} confirmed`;
}

export async function sendMultipleEmails(ctx: Context) {
const { emails } = ctx.kwargs;

// 1. Persist the state of the child workflows
const handlersDef = await ctx.save(async () => {
const handlersDef = [];
for (const email of emails) {
const handleDef = await ctx.startChildWorkflow(sendEmail, {
to: email,
});
handlersDef.push(handleDef);
}

return handlersDef;
});

// 2. Create handles for your child workflows
const handles = handlersDef.map((def) => ctx.createWorkflowHandle(def));

// 3. In this example, we wait on all child workflows to complete
await ctx.ensure(async () => {
for (const handle of handles) {
if (!(await handle.hasStopped())) {
return false;
}
}
return true;
});

const ret = await ctx.save(async () => {
const ret = [];
for (const handle of handles) {
const childResult = await handle.result<string>();
ret.push(childResult);
}

return ret;
});

return ret;
}
```

In your typegraph, you will have:

<SDKTabs>
<TabItem value="python">

```python
from typegraph import typegraph, t, Graph
from typegraph.policy import Policy
from typegraph.runtimes.substantial import SubstantialRuntime, WorkflowFile
from typegraph.runtimes.substantial import Backend


@typegraph()
def substantial_example(g: Graph):
pub = Policy.public()

backend = Backend.redis("REDIS_SECRET")
file = (
WorkflowFile.deno(file="my_workflow.ts", deps=["shared/types.ts"])
.import_(["sendEmail", "sendMultipleEmails"])
.build()
)

sub = SubstantialRuntime(backend, [file])

g.expose(
pub,
stop=sub.stop(),
send_multiple_emails=sub.start(t.struct({ "emails": t.list(t.email()) })).reduce(
{ "name": "sendMultipleEmails"}
),
send_single_email=sub.start(t.struct({"to": t.email()})).reduce(
{"name": "sendEmail"}
),
results_raw=sub.query_results_raw(),
workers=sub.query_resources(),
**sub.internals(), # Required for child workflows
)

```

</TabItem>
<TabItem value="typescript">

```typescript
import { Policy, t, typegraph } from "@typegraph/sdk/index.ts";
import {
SubstantialRuntime,
Backend,
WorkflowFile,
} from "@typegraph/sdk/runtimes/substantial.ts";

typegraph(
{
name: "substantial-example",
},
(g) => {
const pub = Policy.public();
const backend = Backend.redis("REDIS_SECRET");
const file = WorkflowFile.deno("my_workflow.ts", [])
.import(["sendEmail", "sendMultipleEmails"])
.build();

const sub = new SubstantialRuntime(backend, [file]);

g.expose(
{
stop: sub.stop(),
send_multiple_emails: sub
.start(t.struct({ emails: t.list(t.email()) }))
.reduce({ name: "sendMultipleEmails" }),
send_single_email: sub
.start(t.struct({ to: t.email() }))
.reduce({ name: "sendEmail" }),
results_raw: sub.queryResultsRaw(),
workers: sub.queryResources(),
...sub.internals(), // Required for child workflows
},
pub
);
}
);
```

</TabItem>

</SDKTabs>
5 changes: 3 additions & 2 deletions docs/metatype.dev/docs/reference/typegate/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,6 @@ The following environment variables can be used to configure the typegate. `SYNC
| SYNC_S3_SECRET_KEY | Access key secret for the S3 store credentials. | **Required (sync mode)** | password |
| SYNC_S3_PATH_STYLE | `true` or `false`, force path style if `true`. | `false` | `true` |
| SYNC_S3_BUCKET | The bucket to be used for the system (dedicated). | **Required (sync mode)** | `mybucket` |
| SUBSTANTIAL_POLL_INTERVAL_SEC | Rate at which new schedules are read. | **Required (sync mode)** | `mybucket` |
| SUBSTANTIAL_LEASE_LIFESPAN_SEC | Lease duration associated to a workflow run | **Required (sync mode)** | `mybucket` |
| SUBSTANTIAL_POLL_INTERVAL_SEC | Rate at which new schedules are read. | 1.0 | 0.6 |
| SUBSTANTIAL_LEASE_LIFESPAN_SEC | Lease duration associated to a workflow run | 2.0 | 6 |
| SUBSTANTIAL_MAX_ACQUIRE_PER_TICK | Max amount of new acquired replay requests per tick | 3 | 5 |
4 changes: 4 additions & 0 deletions examples/metatype.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ typegates:
temporal:
HOST: "http://localhost:7233"
NAMESPACE: "default"
substantial:
SUB_REDIS: "redis://:password@localhost:6380/0"
substantial-child-workflow:
SUB_REDIS: "redis://:password@localhost:6380/0"

prd:
url: https://demo.metatype.dev
Expand Down
17 changes: 9 additions & 8 deletions src/common/src/typegraph/runtimes/substantial.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,18 @@ pub enum SubstantialBackend {
Redis(RedisConfig),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WorkflowFileDescription {
pub imports: Vec<String>,
pub kind: WorkflowKind,
pub file: PathBuf,
pub deps: Vec<PathBuf>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct SubstantialRuntimeData {
pub backend: SubstantialBackend,
pub workflows: Vec<WorkflowFileDescription>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand All @@ -30,14 +39,6 @@ pub enum WorkflowKind {
Deno,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct WorkflowMatData {
pub name: String,
pub file: PathBuf,
pub kind: WorkflowKind,
pub deps: Vec<PathBuf>,
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ModuleMatData {
Expand Down
1 change: 0 additions & 1 deletion src/substantial/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ anyhow.workspace = true
chrono.workspace = true
serde.workspace = true
serde_json.workspace = true
lazy_static.workspace = true
uuid.workspace = true

protobuf = "3.5.1"
Expand Down
49 changes: 47 additions & 2 deletions src/substantial/src/backends/key_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,13 +266,13 @@ impl BackendMetadataWriter for KeyValueBackend {
}

fn write_workflow_link(&self, workflow_name: String, run_id: String) -> Result<()> {
let key = format!("links/{}/{}", workflow_name, run_id);
let key = format!("links/runs/{}/{}", workflow_name, run_id);
self.store.put(key.as_str(), Item::new(run_id.into_bytes()))
}

fn read_workflow_links(&self, workflow_name: String) -> Result<Vec<String>> {
let mut ret = Vec::new();
let base_key = format!("links/{}/", workflow_name);
let base_key = format!("links/runs/{}/", workflow_name);

for key in self.store.keys()? {
let value = self.store.get(&key)?.unwrap();
Expand All @@ -283,6 +283,51 @@ impl BackendMetadataWriter for KeyValueBackend {

Ok(ret)
}

fn write_parent_child_link(&self, parent_run_id: String, child_run_id: String) -> Result<()> {
let key = format!("links/children/{}/{}", parent_run_id, child_run_id);
self.store
.put(key.as_str(), Item::new(child_run_id.into_bytes()))
}

fn read_direct_children(&self, parent_run_id: String) -> Result<Vec<String>> {
let mut ret = Vec::new();
let base_key = format!("links/children/{}/", parent_run_id);

for key in self.store.keys()? {
let value = self
.store
.get(&key)?
.with_context(|| format!("Get known key {:?}", key))
.unwrap();

if key.starts_with(&base_key) {
ret.push(String::from_utf8(value.data)?);
}
}

Ok(ret)
}

fn enumerate_all_children(&self, parent_run_id: String) -> Result<Vec<String>> {
let mut visited = Vec::new();
let mut stack = vec![parent_run_id.clone()];
let mut result = Vec::new();

while let Some(run_id) = stack.pop() {
if !visited.contains(&run_id) {
visited.push(run_id.clone());
result.push(run_id.clone());

let children = self.read_direct_children(run_id)?;
stack.extend(children);
}
}

result.retain(|rid| rid != &parent_run_id);

Ok(result)
}
}

// Note: always consume &self to make sure implementers never owns a kind of internal state by themselves
Expand Down
Loading

0 comments on commit 1a2b58d

Please sign in to comment.