diff --git a/Cargo.lock b/Cargo.lock index fad6bce5fb..bc59a1d19f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -11092,7 +11092,6 @@ version = "0.4.11-rc.0" dependencies = [ "anyhow", "chrono", - "lazy_static 1.5.0", "protobuf", "redis", "serde 1.0.204", diff --git a/docs/metatype.dev/docs/reference/runtimes/substantial/index.mdx b/docs/metatype.dev/docs/reference/runtimes/substantial/index.mdx new file mode 100644 index 0000000000..c6ccb72390 --- /dev/null +++ b/docs/metatype.dev/docs/reference/runtimes/substantial/index.mdx @@ -0,0 +1,227 @@ +import SDKTabs from "@site/src/components/SDKTabs"; +import TabItem from "@theme/TabItem"; + +# Substantial + +## Substantial runtime + +The Substantial runtime enables the execution of durable workflows in one or accross multiple typegates. + +Why use it? + +- **Long-running "processes"**: Durable tasks that need to run over extended periods (days, weeks or months), handling **retries** and **restarts** seamlessly. +- **Fault-tolerant execution**: Ensure reliable execution of tasks, even upon failures, by maintaining a durable state of the latest run. +- **Task orchestration**: Coordinate complex sequences of workflows (analogous to microservices interactions). + +For example, the workflow bellow will continue running until a `confirmation` event is sent to the **associated run**. + +```typescript +export async function sendEmail(ctx: Context) { + // 1. A workflow can receive parameters whose type is defined on the typegraph + const { to } = ctx.kwargs; + + // 2. When a function call produces effects, we can make it durable + const info = await ctx.save(() => sendSubscriptionEmail(to)); + const timeSent = await ctx.save(() => new Date().toJSON()); + + const confirmation = ctx.receive("confirmation"); + if (!confirmation) { + throw new Error(`${to} has denied the subscription sent at ${timeSent}`); + } + + return `${to} confirmed (${info})`; +} + +``` + +Additionally, if we were to shut down the Typegate node executing it and then restart it, the state **will be preserved**. This means that if the subscription email was already sent, upon relaunch, it will not be sent again, same thing for the value of `timeSent`. + +## Key Concepts + +### Workflows + +A special type of function with **durable state** and an execution mechanism directly tied to time. A workflow can also trigger other workflows (child workflows). + +### Backend + +This abstraction implements a set of atomic operations that allows Typegate to persist and recover the workflow state. Currently, we have the **Redis** backend available, along with others like **fs** and **memory**, which are primarily intended for development or testing purposes. + +### Run + +When a workflow is started, a run is created and Substantial will provide you a `run_id` to uniquely identify it. + +You can send an event or abort an ongoing run from its `run_id`. + + +## Child workflows + +Child workflows are like any other workflows, they are just run by another workflow (parent). + +If a workflow parent is explicitly stopped or aborted, all of its descendants will also be aborted. + +For example, suppose you want to write a workflow that sends a subscription request to a list of emails and then receive a notification for each confirmation or denial, but only during your work hours. + +You can easily translate that logic as if you were writing generic sequential code using Substantial workflows. + +```typescript +import { nextTimeWhenAdminIsAvailable, sendSubscriptionEmail, notifyAdmin } from "./utils.ts"; + +export async function sendEmail(ctx: Context) { + // 1. A workflow can receive parameters whose type is defined on the typegraph + const { to } = ctx.kwargs; + + // 2. When a function call produces effects, we can make it durable + const info = await ctx.save(() => sendSubscriptionEmail(to)); + const timeSent = await ctx.save(() => new Date()); + + const confirmation = ctx.receive("confirmation"); + if (!confirmation) { + throw new Error(`${to} has denied the subscription sent at ${timeSent}`); + } + + // 3. In this scenario, we use a durable sleep to wait until the admin + // is available + const duration = await ctx.save(() => nextTimeWhenAdminIsAvailable(new Date())); + ctx.sleep(duration); + + const _ = await ctx.save(() => notifyAdmin(info), { + retry: { + minBackoffMs: 1000, + maxBackoffMs: 5000, + maxRetries: 4, + } + }); + + return `${to} confirmed`; +} + +export async function sendMultipleEmails(ctx: Context) { + const { emails } = ctx.kwargs; + + // 1. Persist the state of the child workflows + const handlersDef = await ctx.save(async () => { + const handlersDef = []; + for (const email of emails) { + const handleDef = await ctx.startChildWorkflow(sendEmail, { + to: email, + }); + handlersDef.push(handleDef); + } + + return handlersDef; + }); + + // 2. Create handles for your child workflows + const handles = handlersDef.map((def) => ctx.createWorkflowHandle(def)); + + // 3. In this example, we wait on all child workflows to complete + await ctx.ensure(async () => { + for (const handle of handles) { + if (!(await handle.hasStopped())) { + return false; + } + } + return true; + }); + + const ret = await ctx.save(async () => { + const ret = []; + for (const handle of handles) { + const childResult = await handle.result(); + ret.push(childResult); + } + + return ret; + }); + + return ret; +} +``` + +In your typegraph, you will have: + + + + +```python +from typegraph import typegraph, t, Graph +from typegraph.policy import Policy +from typegraph.runtimes.substantial import SubstantialRuntime, WorkflowFile +from typegraph.runtimes.substantial import Backend + + +@typegraph() +def substantial_example(g: Graph): + pub = Policy.public() + + backend = Backend.redis("REDIS_SECRET") + file = ( + WorkflowFile.deno(file="my_workflow.ts", deps=["shared/types.ts"]) + .import_(["sendEmail", "sendMultipleEmails"]) + .build() + ) + + sub = SubstantialRuntime(backend, [file]) + + g.expose( + pub, + stop=sub.stop(), + send_multiple_emails=sub.start(t.struct({ "emails": t.list(t.email()) })).reduce( + { "name": "sendMultipleEmails"} + ), + send_single_email=sub.start(t.struct({"to": t.email()})).reduce( + {"name": "sendEmail"} + ), + results_raw=sub.query_results_raw(), + workers=sub.query_resources(), + **sub.internals(), # Required for child workflows + ) + +``` + + + + +```typescript +import { Policy, t, typegraph } from "@typegraph/sdk/index.ts"; +import { + SubstantialRuntime, + Backend, + WorkflowFile, +} from "@typegraph/sdk/runtimes/substantial.ts"; + +typegraph( + { + name: "substantial-example", + }, + (g) => { + const pub = Policy.public(); + const backend = Backend.redis("REDIS_SECRET"); + const file = WorkflowFile.deno("my_workflow.ts", []) + .import(["sendEmail", "sendMultipleEmails"]) + .build(); + + const sub = new SubstantialRuntime(backend, [file]); + + g.expose( + { + stop: sub.stop(), + send_multiple_emails: sub + .start(t.struct({ emails: t.list(t.email()) })) + .reduce({ name: "sendMultipleEmails" }), + send_single_email: sub + .start(t.struct({ to: t.email() })) + .reduce({ name: "sendEmail" }), + results_raw: sub.queryResultsRaw(), + workers: sub.queryResources(), + ...sub.internals(), // Required for child workflows + }, + pub + ); + } +); +``` + + + + diff --git a/docs/metatype.dev/docs/reference/typegate/index.mdx b/docs/metatype.dev/docs/reference/typegate/index.mdx index 00bbfd76d2..4ab6b1f1be 100644 --- a/docs/metatype.dev/docs/reference/typegate/index.mdx +++ b/docs/metatype.dev/docs/reference/typegate/index.mdx @@ -79,5 +79,6 @@ The following environment variables can be used to configure the typegate. `SYNC | SYNC_S3_SECRET_KEY | Access key secret for the S3 store credentials. | **Required (sync mode)** | password | | SYNC_S3_PATH_STYLE | `true` or `false`, force path style if `true`. | `false` | `true` | | SYNC_S3_BUCKET | The bucket to be used for the system (dedicated). | **Required (sync mode)** | `mybucket` | -| SUBSTANTIAL_POLL_INTERVAL_SEC | Rate at which new schedules are read. | **Required (sync mode)** | `mybucket` | -| SUBSTANTIAL_LEASE_LIFESPAN_SEC | Lease duration associated to a workflow run | **Required (sync mode)** | `mybucket` | +| SUBSTANTIAL_POLL_INTERVAL_SEC | Rate at which new schedules are read. | 1.0 | 0.6 | +| SUBSTANTIAL_LEASE_LIFESPAN_SEC | Lease duration associated to a workflow run | 2.0 | 6 | +| SUBSTANTIAL_MAX_ACQUIRE_PER_TICK | Max amount of new acquired replay requests per tick | 3 | 5 | diff --git a/examples/metatype.yaml b/examples/metatype.yaml index 4e1508f8db..7f0126cfcd 100644 --- a/examples/metatype.yaml +++ b/examples/metatype.yaml @@ -73,6 +73,10 @@ typegates: temporal: HOST: "http://localhost:7233" NAMESPACE: "default" + substantial: + SUB_REDIS: "redis://:password@localhost:6380/0" + substantial-child-workflow: + SUB_REDIS: "redis://:password@localhost:6380/0" prd: url: https://demo.metatype.dev diff --git a/src/common/src/typegraph/runtimes/substantial.rs b/src/common/src/typegraph/runtimes/substantial.rs index c99e5c4baf..e88bb8fd16 100644 --- a/src/common/src/typegraph/runtimes/substantial.rs +++ b/src/common/src/typegraph/runtimes/substantial.rs @@ -18,9 +18,18 @@ pub enum SubstantialBackend { Redis(RedisConfig), } +#[derive(Serialize, Deserialize, Clone, Debug)] +pub struct WorkflowFileDescription { + pub imports: Vec, + pub kind: WorkflowKind, + pub file: PathBuf, + pub deps: Vec, +} + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct SubstantialRuntimeData { pub backend: SubstantialBackend, + pub workflows: Vec, } #[derive(Serialize, Deserialize, Clone, Debug)] @@ -30,14 +39,6 @@ pub enum WorkflowKind { Deno, } -#[derive(Serialize, Deserialize, Clone, Debug)] -pub struct WorkflowMatData { - pub name: String, - pub file: PathBuf, - pub kind: WorkflowKind, - pub deps: Vec, -} - #[derive(Serialize, Deserialize, Clone, Debug)] #[serde(rename_all = "camelCase")] pub struct ModuleMatData { diff --git a/src/substantial/Cargo.toml b/src/substantial/Cargo.toml index d6b86ab6c1..4038c062f8 100644 --- a/src/substantial/Cargo.toml +++ b/src/substantial/Cargo.toml @@ -8,7 +8,6 @@ anyhow.workspace = true chrono.workspace = true serde.workspace = true serde_json.workspace = true -lazy_static.workspace = true uuid.workspace = true protobuf = "3.5.1" diff --git a/src/substantial/src/backends/key_value.rs b/src/substantial/src/backends/key_value.rs index 106f2e98d9..be0d9a65c4 100644 --- a/src/substantial/src/backends/key_value.rs +++ b/src/substantial/src/backends/key_value.rs @@ -266,13 +266,13 @@ impl BackendMetadataWriter for KeyValueBackend { } fn write_workflow_link(&self, workflow_name: String, run_id: String) -> Result<()> { - let key = format!("links/{}/{}", workflow_name, run_id); + let key = format!("links/runs/{}/{}", workflow_name, run_id); self.store.put(key.as_str(), Item::new(run_id.into_bytes())) } fn read_workflow_links(&self, workflow_name: String) -> Result> { let mut ret = Vec::new(); - let base_key = format!("links/{}/", workflow_name); + let base_key = format!("links/runs/{}/", workflow_name); for key in self.store.keys()? { let value = self.store.get(&key)?.unwrap(); @@ -283,6 +283,51 @@ impl BackendMetadataWriter for KeyValueBackend { Ok(ret) } + + fn write_parent_child_link(&self, parent_run_id: String, child_run_id: String) -> Result<()> { + let key = format!("links/children/{}/{}", parent_run_id, child_run_id); + self.store + .put(key.as_str(), Item::new(child_run_id.into_bytes())) + } + + fn read_direct_children(&self, parent_run_id: String) -> Result> { + let mut ret = Vec::new(); + let base_key = format!("links/children/{}/", parent_run_id); + + for key in self.store.keys()? { + let value = self + .store + .get(&key)? + .with_context(|| format!("Get known key {:?}", key)) + .unwrap(); + + if key.starts_with(&base_key) { + ret.push(String::from_utf8(value.data)?); + } + } + + Ok(ret) + } + + fn enumerate_all_children(&self, parent_run_id: String) -> Result> { + let mut visited = Vec::new(); + let mut stack = vec![parent_run_id.clone()]; + let mut result = Vec::new(); + + while let Some(run_id) = stack.pop() { + if !visited.contains(&run_id) { + visited.push(run_id.clone()); + result.push(run_id.clone()); + + let children = self.read_direct_children(run_id)?; + stack.extend(children); + } + } + + result.retain(|rid| rid != &parent_run_id); + + Ok(result) + } } // Note: always consume &self to make sure implementers never owns a kind of internal state by themselves diff --git a/src/substantial/src/backends/memory.rs b/src/substantial/src/backends/memory.rs index 2c6f7b886a..40c2fa0912 100644 --- a/src/substantial/src/backends/memory.rs +++ b/src/substantial/src/backends/memory.rs @@ -1,16 +1,14 @@ use super::key_value::{Item, KeyValueBackend, KeyValueLike}; use anyhow::Result; use chrono::{DateTime, Utc}; -use lazy_static::lazy_static; use std::sync::RwLock; use std::{collections::HashMap, sync::Arc}; -lazy_static! { - static ref STORAGE: Arc>> = Arc::new(RwLock::new(HashMap::new())); +#[derive(Default)] +pub struct BasicMemoryStore { + pub storage: Arc>>, } -pub struct BasicMemoryStore; - impl KeyValueLike for BasicMemoryStore { fn run_key(&self, run_id: &str, subpath: &str) -> Result { Ok(format!("runs/{}/{}", run_id, subpath)) @@ -26,25 +24,26 @@ impl KeyValueLike for BasicMemoryStore { } fn get(&self, key: &str) -> Result> { - Ok(STORAGE.read().unwrap().get(key).cloned()) + Ok(self.storage.read().unwrap().get(key).cloned()) } fn put(&self, key: &str, value: Item) -> Result<()> { - STORAGE.write().unwrap().insert(key.to_string(), value); + self.storage.write().unwrap().insert(key.to_string(), value); Ok(()) } fn remove(&self, key: &str) -> Result<()> { - STORAGE.write().unwrap().remove(key); + self.storage.write().unwrap().remove(key); Ok(()) } fn exists(&self, key: &str) -> Result { - Ok(STORAGE.write().unwrap().contains_key(key)) + Ok(self.storage.write().unwrap().contains_key(key)) } fn keys(&self) -> Result> { - Ok(STORAGE + Ok(self + .storage .read() .unwrap() .keys() @@ -58,7 +57,7 @@ pub struct MemoryBackend(KeyValueBackend); impl Default for MemoryBackend { fn default() -> Self { MemoryBackend(KeyValueBackend { - store: Box::new(BasicMemoryStore), + store: Box::new(BasicMemoryStore::default()), }) } } diff --git a/src/substantial/src/backends/mod.rs b/src/substantial/src/backends/mod.rs index b7064c2e2e..f6541f64ff 100644 --- a/src/substantial/src/backends/mod.rs +++ b/src/substantial/src/backends/mod.rs @@ -49,6 +49,12 @@ pub trait BackendMetadataWriter { fn read_workflow_links(&self, workflow_name: String) -> Result>; + fn write_parent_child_link(&self, parent_run_id: String, child_run_id: String) -> Result<()>; + + fn read_direct_children(&self, parent_run_id: String) -> Result>; + + fn enumerate_all_children(&self, parent_run_id: String) -> Result>; + fn read_all_metadata(&self, run_id: String) -> Result>; fn append_metadata( diff --git a/src/substantial/src/backends/redis.rs b/src/substantial/src/backends/redis.rs index 024d370341..72a4b5139e 100644 --- a/src/substantial/src/backends/redis.rs +++ b/src/substantial/src/backends/redis.rs @@ -263,7 +263,6 @@ impl super::BackendAgent for RedisBackend { .last() .cloned() .with_context(|| format!("Invalid key {:?}", schedule_ref))?; - println!("{:?}", schedule_ref); return Ok(Some(NextRun { run_id, @@ -497,7 +496,7 @@ impl BackendMetadataWriter for RedisBackend { fn write_workflow_link(&self, workflow_name: String, run_id: String) -> Result<()> { self.with_redis(|r| { - let links_key = self.key(&["links", &workflow_name])?; + let links_key = self.key(&["links", "runs", &workflow_name])?; r.zadd(links_key, run_id, 0)?; Ok(()) }) @@ -505,9 +504,71 @@ impl BackendMetadataWriter for RedisBackend { fn read_workflow_links(&self, workflow_name: String) -> Result> { self.with_redis(|r| { - let links_key = self.key(&["links", &workflow_name])?; + let links_key = self.key(&["links", "runs", &workflow_name])?; let run_ids: Vec = r.zrange(links_key, 0, -1)?; Ok(run_ids) }) } + + fn write_parent_child_link(&self, parent_run_id: String, child_run_id: String) -> Result<()> { + self.with_redis(|r| { + let links_key = self.key(&["links", "children", &parent_run_id])?; + r.zadd(links_key, child_run_id, 0)?; + Ok(()) + }) + } + + fn read_direct_children(&self, parent_run_id: String) -> Result> { + self.with_redis(|r| { + let links_key = self.key(&["links", "children", &parent_run_id])?; + let run_ids: Vec = r.zrange(links_key, 0, -1)?; + Ok(run_ids) + }) + } + + fn enumerate_all_children(&self, parent_run_id: String) -> Result> { + self.with_redis(|r| { + let links_prefix = self.key(&["links", "children"])?; + let separator = self.separator.clone(); + + let script = Script::new( + r#" + local parent_run_id = ARGV[1] + local prefix = ARGV[2] + local separator = ARGV[3] + + local stack = {parent_run_id} + local visited = {} + local result = {} + + while #stack > 0 do + local run_id = table.remove(stack) + + if not visited[run_id] then + visited[run_id] = true + table.insert(result, run_id) + + local key = prefix .. separator .. run_id + local children = redis.call("ZRANGE", key, 0, -1) + for i = #children, 1, -1 do + table.insert(stack, children[i]) + end + end + end + + return result + "#, + ); + + let mut result: Vec = script + .arg(&parent_run_id) + .arg(links_prefix) + .arg(separator) + .invoke(r)?; + + result.retain(|rid| rid != &parent_run_id); + + Ok(result) + }) + } } diff --git a/src/substantial/tests/mod.rs b/src/substantial/tests/mod.rs index 3a391621d7..26e5469297 100644 --- a/src/substantial/tests/mod.rs +++ b/src/substantial/tests/mod.rs @@ -36,7 +36,7 @@ mod tests { fn test_basic_run_persist() { let mem_backend = MemoryBackend::default().get(); - let root = PathBuf::from("tmp/test_one/substantial"); + let root = PathBuf::from("tmp/test_basic/substantial"); let fs_backend = FsBackend::new(root.clone()).get(); std::fs::remove_dir_all(root).ok(); @@ -80,7 +80,7 @@ mod tests { ( "fs", Box::new({ - let root = PathBuf::from("tmp/test_two/substantial"); + let root = PathBuf::from("tmp/rust_test_state/substantial"); let backend = FsBackend::new(root.clone()).get(); std::fs::remove_dir_all(root).ok(); backend @@ -89,7 +89,7 @@ mod tests { ( "redis", Box::new({ - let prefix = "rust_test"; + let prefix = "rust_test_state"; let backend = RedisBackend::new( "redis://:password@localhost:6380/0".to_owned(), Some(prefix.to_owned()), @@ -149,13 +149,6 @@ mod tests { into_comparable(&recovered_run), ); - // link metadata - backend - .write_workflow_link("example".to_string(), run_id.clone()) - .unwrap(); - let linked = backend.read_workflow_links("example".to_string()).unwrap(); - assert_eq!(linked.len(), 1); - // log metadata backend .append_metadata(run_id.clone(), Utc::now(), "1234".to_string()) @@ -220,7 +213,138 @@ mod tests { } } + #[test] + fn test_hard_link() { + let backends: Vec<(&str, Box)> = vec![ + ("memory", Box::new(MemoryBackend::default().get())), + ( + "fs", + Box::new({ + let root = PathBuf::from("tmp/test_three/substantial"); + let backend = FsBackend::new(root.clone()).get(); + std::fs::remove_dir_all(root).ok(); + backend + }), + ), + ( + "redis", + Box::new({ + let prefix = "rust_test_link"; + let backend = RedisBackend::new( + "redis://:password@localhost:6380/0".to_owned(), + Some(prefix.to_owned()), + ) + .unwrap(); + backend + .with_redis(|r| { + let script = Script::new(r#"redis.call("FLUSHALL")"#); + script.invoke::<()>(r) + }) + .unwrap(); + + backend + }), + ), + ]; + + for (label, backend) in backends { + println!("Testing backend {:?}", label); + + // link metadata + backend + .write_workflow_link("example".to_string(), "run_id_link_one".to_owned()) + .unwrap(); + backend + .write_workflow_link("example".to_string(), "run_id_link_two".to_owned()) + .unwrap(); + + let linked = backend.read_workflow_links("example".to_string()).unwrap(); + assert_eq!(linked.len(), 2); + + // parent child metadata + let tree = json!({ + "parent1": { + "child_11": { + "child_111": "child_1111", + "child_112": "child_1121", + "child_113": null, + } + }, + "parent2": null + }); + + link_children_rec(&tree, backend.as_ref()).unwrap(); + assert_eq!( + backend + .read_direct_children("parent1".to_owned()) + .unwrap() + .len(), + 1 + ); + assert_eq!( + backend + .read_direct_children("child_11".to_owned()) + .unwrap() + .len(), + 3 + ); + assert_eq!( + backend + .read_direct_children("parent2".to_owned()) + .unwrap() + .len(), + 0 + ); + + // collect all children + let mut children = backend + .enumerate_all_children("parent1".to_owned()) + .unwrap(); + children.sort(); + + let mut expected = [ + "child_11", + "child_112", + "child_1121", + "child_113", + "child_111", + "child_1111", + ] + .iter() + .map(|s| s.to_owned()) + .collect::>(); + expected.sort(); + + assert_eq!(children, expected); + } + } + fn into_comparable(value: &T) -> String { format!("{value:?}") } + + fn link_children_rec( + json_obj: &serde_json::Value, + backend: &dyn Backend, + ) -> anyhow::Result<()> { + if let serde_json::Value::Object(map_obj) = json_obj { + for (parent_id, value) in map_obj { + match value { + serde_json::Value::String(child_id) => { + backend.write_parent_child_link(parent_id.clone(), child_id.clone())?; + } + serde_json::Value::Object(m) => { + for direct_child in m.keys() { + backend + .write_parent_child_link(parent_id.clone(), direct_child.clone())?; + } + link_children_rec(value, backend)?; + } + _ => {} + } + } + } + + Ok(()) + } } diff --git a/src/typegate/engine/00_runtime.js b/src/typegate/engine/00_runtime.js index 64b4e197f3..412d71c056 100644 --- a/src/typegate/engine/00_runtime.js +++ b/src/typegate/engine/00_runtime.js @@ -73,6 +73,8 @@ const Meta = { metadataAppend: getOp("op_sub_metadata_append"), metadataWriteWorkflowLink: getOp("op_sub_metadata_write_workflow_link"), metadataReadWorkflowLinks: getOp("op_sub_metadata_read_workflow_links"), + metadataWriteParentChildLink: getOp("op_sub_metadata_write_parent_child_link"), + metadataEnumerateAllChildren: getOp("op_sub_metadata_enumerate_all_children"), }, grpc: { register: getOp("op_grpc_register"), diff --git a/src/typegate/engine/runtime.d.ts b/src/typegate/engine/runtime.d.ts index a0c1779482..109491903d 100644 --- a/src/typegate/engine/runtime.d.ts +++ b/src/typegate/engine/runtime.d.ts @@ -79,6 +79,12 @@ export type MetaNS = { metadataReadWorkflowLinks: ( inp: ReadWorkflowLinkInput ) => Promise>; + metadataWriteParentChildLink: ( + inp: WriteParentChildLinkInput + ) => Promise; + metadataEnumerateAllChildren: ( + inp: EnumerateAllChildrenInput + ) => Promise>; }; }; @@ -403,3 +409,14 @@ export interface MetadataEvent { at: string; metadata?: MetadataPayload; } + +export interface WriteParentChildLinkInput { + backend: Backend; + parent_run_id: string; + child_run_id: string; +} + +export interface EnumerateAllChildrenInput { + backend: Backend; + parent_run_id: string; +} diff --git a/src/typegate/engine/src/ext.rs b/src/typegate/engine/src/ext.rs index 00156a8f1f..2f2ca8de81 100644 --- a/src/typegate/engine/src/ext.rs +++ b/src/typegate/engine/src/ext.rs @@ -57,6 +57,8 @@ deno_core::extension!( substantial::op_sub_metadata_read_all, substantial::op_sub_metadata_read_workflow_links, substantial::op_sub_metadata_write_workflow_link, + substantial::op_sub_metadata_write_parent_child_link, + substantial::op_sub_metadata_enumerate_all_children, // FIXME(yohe): this test broke and has proven difficult to fix // #[cfg(test)] diff --git a/src/typegate/engine/src/runtimes/substantial.rs b/src/typegate/engine/src/runtimes/substantial.rs index eef7c31d34..9420c61e34 100644 --- a/src/typegate/engine/src/runtimes/substantial.rs +++ b/src/typegate/engine/src/runtimes/substantial.rs @@ -402,3 +402,53 @@ pub async fn op_sub_metadata_read_workflow_links( backend.read_workflow_links(input.workflow_name) } + +#[derive(Deserialize)] +pub struct WriteParentChildLinkInput { + pub backend: SubstantialBackend, + pub parent_run_id: String, + pub child_run_id: String, +} + +#[deno_core::op2(async)] +pub async fn op_sub_metadata_write_parent_child_link( + state: Rc>, + #[serde] input: WriteParentChildLinkInput, +) -> Result<()> { + let ctx = { + let state = state.borrow(); + state.borrow::().clone() + }; + + let backend = ctx + .backends + .entry(input.backend.as_key()) + .or_try_insert_with(|| init_backend(&input.backend))?; + + backend.write_parent_child_link(input.parent_run_id.clone(), input.child_run_id) +} + +#[derive(Deserialize)] +pub struct EnumerateAllChildrenInput { + pub backend: SubstantialBackend, + pub parent_run_id: String, +} + +#[deno_core::op2(async)] +#[serde] +pub async fn op_sub_metadata_enumerate_all_children( + state: Rc>, + #[serde] input: EnumerateAllChildrenInput, +) -> Result> { + let ctx = { + let state = state.borrow(); + state.borrow::().clone() + }; + + let backend = ctx + .backends + .entry(input.backend.as_key()) + .or_try_insert_with(|| init_backend(&input.backend))?; + + backend.enumerate_all_children(input.parent_run_id.clone()) +} diff --git a/src/typegate/src/config.ts b/src/typegate/src/config.ts index 01094d874e..fdd487c15b 100644 --- a/src/typegate/src/config.ts +++ b/src/typegate/src/config.ts @@ -63,6 +63,7 @@ export const defaultTypegateConfigBase = { redis_url_queue_expire_sec: 60 * 5, // 5 minutes substantial_poll_interval_sec: 1, substantial_lease_lifespan_sec: 2, + substantial_max_acquire_per_tick: 3, }; const SYNC_PREFIX = "sync_"; diff --git a/src/typegate/src/config/shared.ts b/src/typegate/src/config/shared.ts index 0e4e1b511c..95574de662 100644 --- a/src/typegate/src/config/shared.ts +++ b/src/typegate/src/config/shared.ts @@ -10,8 +10,10 @@ if (!Deno.env.has("VERSION")) { Deno.env.set("VERSION", get_version()); } -export const envSharedWithWorkers = Object.keys(sharedConfigSchema.shape) - .map((k) => k.toUpperCase()); +export const envSharedWithWorkers = [ + "TEST_OVERRIDE_GQL_ORIGIN", + ...Object.keys(sharedConfigSchema.shape).map((k) => k.toUpperCase()), +]; export const sharedConfig = await configOrExit( sharedConfigSchema, @@ -22,7 +24,7 @@ export const sharedConfig = await configOrExit( Object.fromEntries( envSharedWithWorkers .map((k) => [k.toLocaleLowerCase(), Deno.env.get(k)]) - .filter(([_, v]) => v !== undefined), + .filter(([_, v]) => v !== undefined) ), - ], + ] ); diff --git a/src/typegate/src/config/types.ts b/src/typegate/src/config/types.ts index 87a110f932..9e3c2ef7cd 100644 --- a/src/typegate/src/config/types.ts +++ b/src/typegate/src/config/types.ts @@ -97,6 +97,8 @@ export const typegateConfigBaseSchema = z.object({ substantial_poll_interval_sec: z.coerce.number().positive().min(0.5).max(60), /** Lease duration associated to a run_id */ substantial_lease_lifespan_sec: z.coerce.number().positive().min(1), + /** Maximum amount of new acquired replay requests per tick */ + substantial_max_acquire_per_tick: z.coerce.number().positive().min(1), }); export type TypegateConfigBase = z.infer; diff --git a/src/typegate/src/runtimes/deno/shared_types.ts b/src/typegate/src/runtimes/deno/shared_types.ts index d8e9b924b0..7bbb446fe0 100644 --- a/src/typegate/src/runtimes/deno/shared_types.ts +++ b/src/typegate/src/runtimes/deno/shared_types.ts @@ -70,6 +70,6 @@ export interface TaskExec { ( args: Record, context: TaskContext, - helpers: Record, + helpers: Record ): unknown; } diff --git a/src/typegate/src/runtimes/deno/worker.ts b/src/typegate/src/runtimes/deno/worker.ts index 8c31768e93..ddfc87dde1 100644 --- a/src/typegate/src/runtimes/deno/worker.ts +++ b/src/typegate/src/runtimes/deno/worker.ts @@ -5,6 +5,7 @@ /// import { getLogger } from "../../log.ts"; +import { make_internal } from "../../worker_utils.ts"; import type { Answer, Message } from "../patterns/messenger/types.ts"; import type { @@ -13,7 +14,6 @@ import type { RegisterFuncTask, RegisterImportFuncTask, Task, - TaskContext, TaskExec, } from "./shared_types.ts"; @@ -29,40 +29,6 @@ const additionalHeaders = isTest ? { connection: "close" } : { connection: "keep-alive" }; -const make_internal = ({ meta: { url, token } }: TaskContext) => { - const gql = (query: readonly string[], ...args: unknown[]) => { - if (args.length > 0) { - throw new Error("gql does not support arguments, use variables instead"); - } - // console.log(query); - return { - run: async ( - variables: Record, - ): Promise> => { - const res = await fetch(url, { - method: "POST", - headers: { - accept: "application/json", - "content-type": "application/json", - authorization: `Bearer ${token}`, - ...additionalHeaders, - }, - body: JSON.stringify({ - query: query[0], - variables, - }), - }); - if (!res.ok) { - throw new Error(`gql fetch on ${url} failed: ${await res.text()}`); - } - // console.log - return res.json(); - }, - }; - }; - return { gql }; -}; - async function import_func(op: number, task: ImportFuncTask) { const { name, args, internals, verbose } = task; @@ -73,7 +39,11 @@ async function import_func(op: number, task: ImportFuncTask) { verbose && logger.info(`exec func "${name}" from module ${op}`); const mod = registry.get(op)! as TaskModule; if (name in mod && typeof mod[name] === "function") { - return await mod[name](args, internals, make_internal(internals)); + return await mod[name]( + args, + internals, + make_internal(internals, additionalHeaders) + ); } throw new Error(`"${name}" is not a valid method`); } @@ -87,7 +57,7 @@ async function func(op: number, task: FuncTask) { verbose && logger.info(`exec func "${op}"`); const fn = registry.get(op)! as TaskExec; - return await fn(args, internals, make_internal(internals)); + return await fn(args, internals, make_internal(internals, additionalHeaders)); } async function register_import_func(_: null, task: RegisterImportFuncTask) { @@ -104,7 +74,7 @@ function register_func(_: null, task: RegisterFuncTask) { registry.set( op, - new Function(`"use strict"; ${fnCode}; return _my_lambda;`)(), + new Function(`"use strict"; ${fnCode}; return _my_lambda;`)() ); } diff --git a/src/typegate/src/runtimes/substantial.ts b/src/typegate/src/runtimes/substantial.ts index d0df5b80cc..913ceb66ff 100644 --- a/src/typegate/src/runtimes/substantial.ts +++ b/src/typegate/src/runtimes/substantial.ts @@ -9,10 +9,18 @@ import { registerRuntime } from "./mod.ts"; import { getLogger, Logger } from "../log.ts"; import * as ast from "graphql/ast"; import { path } from "compress/deps.ts"; -import { Artifact, Materializer } from "../typegraph/types.ts"; +import { Artifact } from "../typegraph/types.ts"; import { Typegate } from "../typegate/mod.ts"; import { Backend } from "../../engine/runtime.js"; -import { Agent, WorkflowDescription } from "./substantial/agent.ts"; +import { + Agent, + AgentConfig, + WorkflowDescription, +} from "./substantial/agent.ts"; +import { closestWord } from "../utils.ts"; +import { InternalAuth } from "../services/auth/protocols/internal.ts"; +import { TaskContext } from "./deno/shared_types.ts"; +import { globalConfig } from "../config.ts"; const logger = getLogger(import.meta); @@ -31,6 +39,18 @@ type QueryOngoingWorkflowResult = Omit< "result" | "ended_at" >; +interface WorkflowFileDescription { + imports: string[]; + kind: "deno" | "python"; + file: string; + deps: string[]; +} + +interface SubstantialRuntimeArgs { + backend: Backend; + workflows: Array; +} + @registerRuntime("substantial") export class SubstantialRuntime extends Runtime { private logger: Logger; @@ -39,23 +59,27 @@ export class SubstantialRuntime extends Runtime { private agent: Agent; private queue: string; + private typegate: Typegate; private constructor( typegraphName: string, backend: Backend, queue: string, - agent: Agent + agent: Agent, + typegate: Typegate ) { super(typegraphName); this.logger = getLogger(`substantial:'${typegraphName}'`); this.backend = backend; this.queue = queue; this.agent = agent; + this.typegate = typegate; } static async init(params: RuntimeInitParams): Promise { logger.info("initializing SubstantialRuntime"); - logger.debug(`init params: ${JSON.stringify(params)}`); + // logger.debug(`init params: ${JSON.stringify(params)}`); + const { typegraph: tg, args, @@ -64,8 +88,9 @@ export class SubstantialRuntime extends Runtime { typegate, } = params; - const secrets: Record = {}; + const runtimeArgs = args as SubstantialRuntimeArgs; + const secrets: Record = {}; for (const m of materializers) { for (const secretName of (m.data.secrets as []) ?? []) { secrets[secretName] = secretManager.secretOrFail(secretName); @@ -73,7 +98,7 @@ export class SubstantialRuntime extends Runtime { } const tgName = TypeGraph.formatName(tg); - const backend = (args as any)!.backend as Backend; + const backend = runtimeArgs.backend; if (backend.type == "redis") { backend.connection_string = secretManager.secretOrFail( backend.connection_string @@ -82,26 +107,47 @@ export class SubstantialRuntime extends Runtime { const queue = "default"; - // Prepare the backend event poller - const agent = new Agent(backend, queue, { - poll_interval_sec: typegate.config.base.substantial_poll_interval_sec, - lease_lifespan_sec: typegate.config.base.substantial_lease_lifespan_sec, - }); + const agentConfig = { + pollIntervalSec: typegate.config.base.substantial_poll_interval_sec, + leaseLifespanSec: typegate.config.base.substantial_lease_lifespan_sec, + maxAcquirePerTick: typegate.config.base.substantial_max_acquire_per_tick, + } satisfies AgentConfig; + + // Note: required for ctx.gql() + const token = await InternalAuth.emit(typegate.cryptoKeys); + const internalTCtx = { + context: {}, + secrets: {}, + effect: null, + meta: { + url: `http://127.0.0.1:${globalConfig.tg_port}/${tgName}`, + token, + }, + headers: {}, + } satisfies TaskContext; + + const agent = new Agent(backend, queue, agentConfig, internalTCtx); const wfDescriptions = await getWorkflowDescriptions( tgName, tg.meta.artifacts, - materializers, + runtimeArgs.workflows, typegate ); agent.start(wfDescriptions); // Prepare the runtime - const instance = new SubstantialRuntime(tgName, backend, queue, agent); + const instance = new SubstantialRuntime( + tgName, + backend, + queue, + agent, + typegate + ); await instance.#prepareWorkflowFiles( tg.meta.artifacts, - materializers, + runtimeArgs.workflows, typegate ); @@ -162,34 +208,47 @@ export class SubstantialRuntime extends Runtime { #delegate(mat: TypeMaterializer): Resolver { const matName = mat.name; - const data = mat?.data ?? {}; - const workflowName = data.name as string; switch (matName) { case "start": - return this.#startResolver(workflowName); + return this.#startResolver(false); + case "start_raw": + return this.#startResolver(true); case "stop": return this.#stopResolver(); case "send": - return this.#sendResolver(); + return this.#sendResolver(false); + case "send_raw": + return this.#sendResolver(true); case "resources": - return () => { + return ({ name: workflowName }) => { + this.#checkWorkflowExistOrThrow(workflowName); + const res = this.agent.workerManager.getAllocatedResources(workflowName); return JSON.parse(JSON.stringify(res)); }; case "results": - return this.#resultsResover(workflowName); + return this.#resultsResover(false); + case "results_raw": + return this.#resultsResover(true); + case "internal_link_parent_child": + return this.#linkerResolver(); default: - return () => null; + throw new Error(`Unimplemented operation ${mat.name}`); } } - #startResolver(workflowName: string): Resolver { - return async ({ kwargs }) => { + #startResolver(enableGenerics: boolean): Resolver { + return async ({ name: workflowName, kwargs }) => { + this.#checkWorkflowExistOrThrow(workflowName); + const runId = Agent.nextId(workflowName); const schedule = new Date().toJSON(); + logger.info( + `Start request "${workflowName}" received: new run "${runId}" should be scheduled.` + ); await this.agent.schedule({ backend: this.backend, queue: this.queue, @@ -199,20 +258,20 @@ export class SubstantialRuntime extends Runtime { at: schedule, event: { type: "Start", - kwargs, + kwargs: enableGenerics ? JSON.parse(kwargs) : kwargs, }, }, }); await this.agent.link(workflowName, runId); - - // TODO: return { workflow_name, run_id, schedule } instead return runId; }; } - #resultsResover(workflowName: string): Resolver { - return async () => { + #resultsResover(enableGenerics: boolean): Resolver { + return async ({ name: workflowName }) => { + this.#checkWorkflowExistOrThrow(workflowName); + const relatedRuns = await this.agent.retrieveLinks(workflowName); const ongoing = [] as Array; const completed = [] as Array; @@ -240,7 +299,9 @@ export class SubstantialRuntime extends Runtime { ended_at: endedAt!, result: { status: kind == "Ok" ? "COMPLETED" : "COMPLETED_WITH_ERROR", - value: result[kind], + value: enableGenerics + ? JSON.stringify(result[kind]) + : result[kind], }, }); } else { @@ -268,28 +329,45 @@ export class SubstantialRuntime extends Runtime { #stopResolver(): Resolver { return async ({ run_id }) => { - const schedule = new Date().toJSON(); - await this.agent.schedule({ + const children = await Meta.substantial.metadataEnumerateAllChildren({ backend: this.backend, - queue: this.queue, - run_id, - schedule, - operation: { - at: new Date().toJSON(), - event: { - type: "Stop", - result: { - Err: "ABORTED", - }, - }, - }, + parent_run_id: run_id, }); - return run_id; + const stopQueue = [run_id, ...children] as Array; + const willBeStopped = []; + while (true) { + // TODO: what if some fail? maybe collect all failing ones instead and put that on the error? + const currRunId = stopQueue.shift(); + if (currRunId) { + const schedule = new Date().toJSON(); + await this.agent.schedule({ + backend: this.backend, + queue: this.queue, + run_id: currRunId, + schedule, + operation: { + at: new Date().toJSON(), + event: { + type: "Stop", + result: { + Err: "ABORTED", + }, + }, + }, + }); + + willBeStopped.push(currRunId); + } else { + break; + } + } + + return willBeStopped; }; } - #sendResolver(): Resolver { + #sendResolver(enableGenerics: boolean): Resolver { return async ({ run_id, event }) => { const schedule = new Date().toJSON(); @@ -303,7 +381,7 @@ export class SubstantialRuntime extends Runtime { event: { type: "Send", event_name: event.name, - value: event.payload, + value: enableGenerics ? JSON.parse(event.payload) : event.payload, }, }, }); @@ -312,15 +390,26 @@ export class SubstantialRuntime extends Runtime { }; } + #linkerResolver(): Resolver { + return async ({ parent_run_id, child_run_id }) => { + await Meta.substantial.metadataWriteParentChildLink({ + backend: this.backend, + parent_run_id, + child_run_id, + }); + return true; + }; + } + async #prepareWorkflowFiles( artifacts: Record, - materializers: Materializer[], + fileDescriptions: Array, typegate: Typegate ) { const descriptions = await getWorkflowDescriptions( this.typegraphName, artifacts, - materializers, + fileDescriptions, typegate ); @@ -328,41 +417,56 @@ export class SubstantialRuntime extends Runtime { this.workflowFiles.set(wf.name, wf.path); } } + + #checkWorkflowExistOrThrow(name: string) { + if (!this.workflowFiles.has(name)) { + const known = Array.from(this.workflowFiles.keys()); + const closest = closestWord(name, known); + if (closest) { + throw new Error( + `workflow "${name}" does not exist, did you mean "${closest}"?` + ); + } + + throw new Error( + `workflow "${name}" does not exist, available workflows are ${known + .map((name) => `"${name}"`) + .join(", ")}` + ); + } + } } async function getWorkflowDescriptions( typegraphName: string, artifacts: Record, - materializers: Materializer[], + descriptions: Array, typegate: Typegate ) { const basePath = path.join(typegate.config.base.tmp_dir, "artifacts"); + logger.info(`Resolved runtime artifacts at ${basePath}`); const workflowDescriptions = [] as Array; const seen = new Set(); - for (const mat of materializers) { - if (!["start", "stop", "send", "resources", "results"].includes(mat.name)) { - continue; - } - - const workflowName = mat.data.name as string; - if (!seen.has(workflowName)) { - const entryModulePath = await getWorkflowEntryPointPath( - typegraphName, - artifacts, - mat.data, - typegate - ); - - logger.info(`Resolved runtime artifacts at ${basePath}`); - workflowDescriptions.push({ - name: workflowName, - kind: (mat.data.kind as string).toUpperCase() as "DENO" | "PYTHON", - path: entryModulePath, - }); - - seen.add(workflowName); + for (const description of descriptions) { + for (const workflowName of description.imports) { + if (!seen.has(workflowName)) { + const entryModulePath = await getWorkflowEntryPointPath( + typegraphName, + artifacts, + description, + typegate + ); + + workflowDescriptions.push({ + name: workflowName, + kind: description.kind.toUpperCase() as "DENO" | "PYTHON", + path: entryModulePath, + }); + + seen.add(workflowName); + } } } @@ -372,11 +476,11 @@ async function getWorkflowDescriptions( async function getWorkflowEntryPointPath( typegraphName: string, artifacts: Record, - matData: Record, + description: WorkflowFileDescription, typegate: Typegate ) { - const entryPoint = artifacts[matData.file as string]; - const deps = (matData.deps as string[]).map((dep) => artifacts[dep]); + const entryPoint = artifacts[description.file as string]; + const deps = (description.deps as string[]).map((dep) => artifacts[dep]); const moduleMeta = { typegraphName, relativePath: entryPoint.path, diff --git a/src/typegate/src/runtimes/substantial/agent.ts b/src/typegate/src/runtimes/substantial/agent.ts index 4857a0b622..966ce460f3 100644 --- a/src/typegate/src/runtimes/substantial/agent.ts +++ b/src/typegate/src/runtimes/substantial/agent.ts @@ -3,15 +3,16 @@ import { Backend, NextRun, Run, + ReadOrCloseScheduleInput, } from "../../../engine/runtime.js"; import { getLogger } from "../../log.ts"; -import { sleep } from "../../utils.ts"; +import { TaskContext } from "../deno/shared_types.ts"; import { Interrupt, - Operation, Result, WorkerData, WorkflowResult, + appendIfOngoing, } from "./types.ts"; import { RunId, WorkerManager } from "./workflow_worker_manager.ts"; @@ -23,14 +24,10 @@ export interface WorkflowDescription { kind: "DENO" | "PYTHON"; } -export interface StagedUserEvent { - runId: string; - operation: Operation; -} - export interface AgentConfig { - poll_interval_sec: number; - lease_lifespan_sec: number; + pollIntervalSec: number; + leaseLifespanSec: number; + maxAcquirePerTick: number; } export class Agent { @@ -41,7 +38,8 @@ export class Agent { constructor( private backend: Backend, private queue: string, - private config: AgentConfig + private config: AgentConfig, + private internalTCtx: TaskContext ) {} async schedule(input: AddScheduleInput) { @@ -49,12 +47,18 @@ export class Agent { } async log(runId: string, schedule: string, content: unknown) { - await Meta.substantial.metadataAppend({ - backend: this.backend, - schedule, - run_id: runId, - content, - }); + try { + await Meta.substantial.metadataAppend({ + backend: this.backend, + schedule, + run_id: runId, + content, + }); + } catch (err) { + logger.warn( + `Failed writing log metadata for schedule "${schedule}" (${runId}), skipping it: ${err}` + ); + } } async link(workflowName: string, runId: string) { @@ -96,7 +100,7 @@ export class Agent { } catch (err) { logger.error(err); } - }, 1000 * this.config.poll_interval_sec); + }, 1000 * this.config.pollIntervalSec); } stop() { @@ -108,19 +112,47 @@ export class Agent { async #nextIteration() { logger.warn("POLL"); - const next = await this.#tryAcquireNextRun(); - for (const workflow of this.workflows) { - if (next && Agent.parseWorkflowName(next.run_id) == workflow.name) { - // logger.warn(`Run workflow ${JSON.stringify(next)}`); + // Note: in multiple agents/typegate scenario, a single node may acquire all runs for itself within a tick span + // To account for that, keep this reasonable + const acquireMaxForThisAgent = this.config.maxAcquirePerTick; + const replayRequests = [] as Array; + const runIdSeen = new Set(); + + while (replayRequests.length <= acquireMaxForThisAgent) { + const next = await this.#tryAcquireNextRun(); + if (next && !runIdSeen.has(next.run_id)) { + replayRequests.push(next); + // we cannot start more than 1 worker associated to a runId + runIdSeen.add(next.run_id); + } else { + break; + } + } - await this.log(next.run_id, next.schedule_date, { - message: "Replay workflow", - next, - }); + for (const workflow of this.workflows) { + const requests = replayRequests.filter( + ({ run_id }) => Agent.parseWorkflowName(run_id) == workflow.name + ); - await this.#replay(next, workflow); - return; + while (requests.length > 0) { + // logger.warn(`Run workflow ${JSON.stringify(next)}`); + const next = requests.shift(); + if (next) { + try { + await this.#replay(next, workflow); + } catch (err) { + logger.error( + `Replay failed for ${workflow.name} => ${JSON.stringify(next)}` + ); + logger.error(err); + } finally { + await this.log(next.run_id, next.schedule_date, { + message: "Replaying workflow", + next, + }); + } + } } } } @@ -128,7 +160,7 @@ export class Agent { async #tryAcquireNextRun() { const activeRunIds = await Meta.substantial.agentActiveLeases({ backend: this.backend, - lease_seconds: this.config.lease_lifespan_sec, + lease_seconds: this.config.leaseLifespanSec, }); logger.info(`Active leases: ${activeRunIds.join(", ")}`); @@ -145,7 +177,7 @@ export class Agent { const acquired = await Meta.substantial.agentAcquireLease({ backend: this.backend, - lease_seconds: this.config.lease_lifespan_sec, + lease_seconds: this.config.leaseLifespanSec, run_id: next.run_id, }); @@ -176,9 +208,8 @@ export class Agent { backend: this.backend, queue: this.queue, run_id: next.run_id, - workflow_name: workflow.name, schedule: next.schedule_date, - }; + } satisfies ReadOrCloseScheduleInput; const newEventOp = await Meta.substantial.storeReadSchedule(schedDef); if (checkIfRunHasStopped(run)) { @@ -190,7 +221,7 @@ export class Agent { } if (newEventOp) { - run.operations.push(newEventOp); + appendIfOngoing(run, newEventOp); } if (run.operations.length == 0) { @@ -212,19 +243,34 @@ export class Agent { return; } - this.workerManager.triggerStart( - workflow.name, - next.run_id, - workflow.path, - run, - next.schedule_date, - first.event.kwargs - ); + try { + this.workerManager.triggerStart( + workflow.name, + next.run_id, + workflow.path, + run, + next.schedule_date, + first.event.kwargs, + this.internalTCtx + ); - this.workerManager.listen( - next.run_id, - this.#eventResultHandlerFor(workflow.name, next.run_id) - ); + this.workerManager.listen( + next.run_id, + this.#eventResultHandlerFor(workflow.name, next.run_id) + ); + } catch (err) { + throw err; + } finally { + if (run.operations.length == 1) { + // Make sure it is already visible on the ongoing list + // without waiting for interrupts to store the first run + logger.info(`Persist first run "${next.run_id}"`); + await Meta.substantial.storePersistRun({ + backend: this.backend, + run, + }); + } + } } #eventResultHandlerFor(workflowName: string, runId: string) { @@ -246,7 +292,8 @@ export class Agent { switch (answer.type) { case "START": { const ret = answer.data as WorkflowResult; - switch (Interrupt.getTypeOf(ret.exception)) { + const interrupt = Interrupt.getTypeOf(ret.exception); + switch (interrupt) { case "SAVE_RETRY": case "SLEEP": case "WAIT_ENSURE_VALUE": @@ -255,14 +302,17 @@ export class Agent { await this.#workflowHandleInterrupts(workflowName, runId, ret); break; } - default: { + case null: { await this.#workflowHandleGracefullCompletion( startedAt, workflowName, runId, ret ); + break; } + default: + throw new Error(`Unknown interrupt "${interrupt}"`); } break; } @@ -319,7 +369,7 @@ export class Agent { logger.info(`Renew lease ${runId}`); await Meta.substantial.agentRenewLease({ backend: this.backend, - lease_seconds: this.config.lease_lifespan_sec, + lease_seconds: this.config.leaseLifespanSec, run_id: runId, }); } @@ -340,8 +390,10 @@ export class Agent { logger.info(`Append Stop ${runId}`); const rustResult = kind == "FAIL" ? "Err" : "Ok"; - run.operations.push({ - // Note: run is a one-time value, thus can be mutated + + // Note: run is a one-time value, thus can be mutated + + appendIfOngoing(run, { at: new Date().toJSON(), event: { type: "Stop", @@ -372,7 +424,7 @@ export class Agent { await Meta.substantial.agentRemoveLease({ backend: this.backend, run_id: runId, - lease_seconds: this.config.lease_lifespan_sec, + lease_seconds: this.config.leaseLifespanSec, }); } diff --git a/src/typegate/src/runtimes/substantial/deno_context.ts b/src/typegate/src/runtimes/substantial/deno_context.ts index 44cd59dd41..44a48f81e0 100644 --- a/src/typegate/src/runtimes/substantial/deno_context.ts +++ b/src/typegate/src/runtimes/substantial/deno_context.ts @@ -4,11 +4,26 @@ // FIXME: DO NOT IMPORT any file that refers to Meta, this will be instantiated in a Worker // import { sleep } from "../../utils.ts"; // will silently fail?? -import { Interrupt, OperationEvent, Run } from "./types.ts"; +import { make_internal } from "../../worker_utils.ts"; +import { TaskContext } from "../deno/shared_types.ts"; +import { Interrupt, OperationEvent, Run, appendIfOngoing } from "./types.ts"; + +// const isTest = Deno.env.get("DENO_TESTING") === "true"; +const testBaseUrl = Deno.env.get("TEST_OVERRIDE_GQL_ORIGIN"); + +const additionalHeaders = { connection: "keep-alive" }; export class Context { private id: number = 0; - constructor(private run: Run, private kwargs: Record) {} + gql: ReturnType; + + constructor( + private run: Run, + private kwargs: Record, + private internal: TaskContext + ) { + this.gql = createGQLClient(internal); + } #nextId() { // IDEA: this scheme does not account the step provided @@ -18,7 +33,7 @@ export class Context { } #appendOp(op: OperationEvent) { - this.run.operations.push({ at: new Date().toJSON(), event: op }); + appendIfOngoing(this.run, { at: new Date().toJSON(), event: op }); } async save(fn: () => T | Promise, option?: SaveOption) { @@ -57,7 +72,7 @@ export class Context { id, value: { type: "Resolved", - payload: result, + payload: result ?? null, }, }); @@ -160,7 +175,7 @@ export class Context { for (const { event } of this.run.operations) { if (event.type == "Send" && event.event_name == eventName) { const payload = event.value; - return await fn(payload); + return await this.save(async () => await fn(payload)); } } @@ -175,6 +190,149 @@ export class Context { return result; } + + // Note: This is designed to be used inside ctx.save(..) + async startChildWorkflow(workflow: Workflow, kwargs: unknown) { + const handle = new ChildWorkflowHandle(this, { + name: workflow.name, + kwargs, + }); + const runId = await handle.start(); + return { + ...handle.handleDef, + runId, + }; + } + + // Note: This is designed to be used outside a ctx.save since function methods cannot be persisted + createWorkflowHandle(handleDef: SerializableWorkflowHandle) { + if (!handleDef.runId) { + throw new Error( + "Cannot create handle from a definition that was not run" + ); + } + return new ChildWorkflowHandle(this, handleDef); + } +} + +export type Workflow = (ctx: Context) => Promise; + +interface SerializableWorkflowHandle { + runId?: string; + + name: string; + kwargs: unknown; +} + +export class ChildWorkflowHandle { + constructor( + private ctx: Context, + public handleDef: SerializableWorkflowHandle + ) {} + + async start(): Promise { + const { data } = await this.ctx.gql/**/ ` + mutation ($name: String!, $kwargs: String!) { + _sub_internal_start(name: $name, kwargs: $kwargs) + } + `.run({ + name: this.handleDef.name, + kwargs: JSON.stringify(this.handleDef.kwargs), + }); + + this.handleDef.runId = (data as any)._sub_internal_start as string; + this.#checkRunId(); + + const { data: _ } = await this.ctx.gql/**/ ` + mutation ($parent_run_id: String!, $child_run_id: String!) { + _sub_internal_link_parent_child(parent_run_id: $parent_run_id, child_run_id: $child_run_id) + } + `.run({ + parent_run_id: this.ctx.getRun().run_id, + child_run_id: this.handleDef.runId!, + }); + + return this.handleDef.runId!; + } + + async result(): Promise { + this.#checkRunId(); + + const { data } = await this.ctx.gql/**/ ` + query ($name: String!) { + _sub_internal_results(name: $name) { + completed { + runs { + run_id + result { + value + status + } + } + } + } + } + `.run({ + name: this.handleDef.name, + }); + + const runs = (data as any)?._sub_internal_results?.completed + ?.runs as Array; + + const current = runs + .filter(({ run_id }) => run_id == this.handleDef.runId) + .shift(); + if (current) { + return JSON.parse(current.result.value) as O; + } + + throw Error(`Result for child workflow "${name}" not yet resolved`); + } + + async stop(): Promise { + this.#checkRunId(); + + const { data } = await this.ctx.gql/**/ ` + mutation ($run_id: String!) { + _sub_internal_stop(run_id: $run_id) + } + `.run({ + run_id: this.handleDef.runId, + }); + + return (data as any)._sub_internal_stop as string; + } + + async hasStopped(): Promise { + this.#checkRunId(); + + const { data } = await this.ctx.gql/**/ ` + query { + _sub_internal_results(name: $name) { + completed { + runs { + run_id + } + } + } + } + `.run({ + name: this.handleDef.name, + }); + + const runs = (data as any)?._sub_internal_results?.completed + ?.runs as Array; + + return runs.some(({ run_id }) => run_id == this.handleDef.runId); + } + + #checkRunId() { + if (!this.handleDef.runId) { + throw new Error( + "Invalid state: run_id is not properly set, this could mean that the workflow was not started yet" + ); + } + } } // TODO: move all of these into substantial lib once Meta can be used inside workers @@ -259,3 +417,16 @@ class RetryStrategy { return Math.floor(((this.maxRetries - retriesLeft) * dt) / this.maxRetries); } } + +function createGQLClient(internal: TaskContext) { + const tgLocal = new URL(internal.meta.url); + if (testBaseUrl) { + const newBase = new URL(testBaseUrl); + tgLocal.protocol = newBase.protocol; + tgLocal.hostname = newBase.hostname; + tgLocal.port = newBase.port; + } + + const meta = { ...internal.meta, url: tgLocal.toString() }; + return make_internal({ ...internal, meta }, additionalHeaders).gql; +} diff --git a/src/typegate/src/runtimes/substantial/types.ts b/src/typegate/src/runtimes/substantial/types.ts index 2a1258d3d6..e7defcf5bd 100644 --- a/src/typegate/src/runtimes/substantial/types.ts +++ b/src/typegate/src/runtimes/substantial/types.ts @@ -1,7 +1,7 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -import { Run } from "../../../engine/runtime.js"; +import { Operation, Run } from "../../../engine/runtime.js"; export type { Operation, OperationEvent, @@ -79,3 +79,10 @@ export class Interrupt extends Error { return new Interrupt(kind, cause); } } + +export function appendIfOngoing(run: Run, operation: Operation) { + const hasStopped = run.operations.some(({ event }) => event.type == "Stop"); + if (!hasStopped) { + run.operations.push(operation); + } +} diff --git a/src/typegate/src/runtimes/substantial/worker.ts b/src/typegate/src/runtimes/substantial/worker.ts index 084a0b6211..fcbc626421 100644 --- a/src/typegate/src/runtimes/substantial/worker.ts +++ b/src/typegate/src/runtimes/substantial/worker.ts @@ -1,6 +1,7 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 +import { errorToString } from "../../worker_utils.ts"; import { Context } from "./deno_context.ts"; import { Err, Msg, Ok, WorkerData, WorkflowResult } from "./types.ts"; @@ -10,7 +11,8 @@ self.onmessage = async function (event) { const { type, data } = event.data as WorkerData; switch (type) { case "START": { - const { modulePath, functionName, run, schedule, kwargs } = data; + const { modulePath, functionName, run, schedule, kwargs, internal } = + data; // FIXME: handle case when script is missing and notify WorkerManager so it cleans up // its registry. const module = await import(modulePath); @@ -19,12 +21,12 @@ self.onmessage = async function (event) { const workflowFn = module[functionName]; if (typeof workflowFn !== "function") { - self.postMessage(Err(`Function ${functionName} is not found`)); + self.postMessage(Err(`Function "${functionName}" not found`)); self.close(); return; } - runCtx = new Context(run, kwargs); + runCtx = new Context(run, kwargs, internal); workflowFn(runCtx) .then((wfResult: unknown) => { @@ -44,10 +46,7 @@ self.onmessage = async function (event) { Ok( Msg(type, { kind: "FAIL", - result: - wfException instanceof Error - ? wfException.message - : JSON.stringify(wfException), + result: errorToString(wfException), exception: wfException instanceof Error ? wfException : undefined, run: runCtx!.getRun(), diff --git a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts index bc6000e0c2..795d20975a 100644 --- a/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts +++ b/src/typegate/src/runtimes/substantial/workflow_worker_manager.ts @@ -1,7 +1,9 @@ // Copyright Metatype OÜ, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 +import { envSharedWithWorkers } from "../../config/shared.ts"; import { getLogger } from "../../log.ts"; +import { TaskContext } from "../deno/shared_types.ts"; import { Err, Msg, @@ -112,7 +114,23 @@ export class WorkerManager { #createWorker(name: string, modulePath: string, runId: RunId) { const worker = new Worker(import.meta.resolve("./worker.ts"), { + name: runId, type: "module", + deno: { + permissions: { + // overrideable default permissions + hrtime: false, + net: true, + // on request permissions + read: "inherit", // default read permission + sys: "inherit", + // non-overridable permissions (security between typegraphs) + run: false, + write: false, + ffi: false, + env: envSharedWithWorkers, + }, + }, }); this.recorder.addWorker( @@ -203,7 +221,8 @@ export class WorkerManager { workflowModPath: string, storedRun: Run, schedule: string, - kwargs: Record + kwargs: Record, + internalTCtx: TaskContext ) { this.#createWorker(name, workflowModPath, runId); this.trigger("START", runId, { @@ -212,6 +231,7 @@ export class WorkerManager { run: storedRun, kwargs, schedule, + internal: internalTCtx, }); } } diff --git a/src/typegate/src/worker_utils.ts b/src/typegate/src/worker_utils.ts new file mode 100644 index 0000000000..1eaa69d05e --- /dev/null +++ b/src/typegate/src/worker_utils.ts @@ -0,0 +1,51 @@ +// Copyright Metatype OÜ, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +// WARNING: Assume any content or state in this file will run inside a Web Worker + +import { TaskContext } from "./runtimes/deno/shared_types.ts"; + +export function make_internal( + { meta: { url, token } }: TaskContext, + additionalHeaders: Record +) { + const gql = (query: readonly string[], ...args: unknown[]) => { + if (args.length > 0) { + throw new Error("gql does not support arguments, use variables instead"); + } + // console.log(query); + return { + run: async ( + variables: Record + ): Promise> => { + const res = await fetch(url, { + method: "POST", + headers: { + accept: "application/json", + "content-type": "application/json", + authorization: `Bearer ${token}`, + ...additionalHeaders, + }, + body: JSON.stringify({ + query: query[0], + variables, + }), + }); + if (!res.ok) { + throw new Error(`gql fetch on ${url} failed: ${await res.text()}`); + } + // console.log + return res.json(); + }, + }; + }; + return { gql }; +} + +export function errorToString(err: unknown) { + if (err instanceof Error) { + return err.message; + } else { + return JSON.stringify(err); + } +} diff --git a/src/typegraph/core/src/conversion/runtimes.rs b/src/typegraph/core/src/conversion/runtimes.rs index f9900a483e..9e685fcb54 100644 --- a/src/typegraph/core/src/conversion/runtimes.rs +++ b/src/typegraph/core/src/conversion/runtimes.rs @@ -20,7 +20,9 @@ use common::typegraph::runtimes::kv::KvRuntimeData; use common::typegraph::runtimes::python::PythonRuntimeData; use common::typegraph::runtimes::random::RandomRuntimeData; use common::typegraph::runtimes::s3::S3RuntimeData; -use common::typegraph::runtimes::substantial::{self, RedisConfig, SubstantialRuntimeData}; +use common::typegraph::runtimes::substantial::{ + self, RedisConfig, SubstantialRuntimeData, WorkflowFileDescription, +}; use common::typegraph::runtimes::temporal::TemporalRuntimeData; use common::typegraph::runtimes::wasm::WasmRuntimeData; use common::typegraph::runtimes::{ @@ -32,6 +34,7 @@ use indexmap::IndexMap; use serde_json::json; use sha2::{Digest, Sha256}; use std::collections::HashMap; +use std::path::PathBuf; use std::rc::Rc; use unindent::Unindent; @@ -501,7 +504,7 @@ pub fn convert_runtime(_c: &mut TypegraphContext, runtime: Runtime) -> Result { - let backend = match &data.backend { + let backend = match data.backend.clone() { SubstantialBackend::Memory => substantial::SubstantialBackend::Memory, SubstantialBackend::Fs => substantial::SubstantialBackend::Fs, SubstantialBackend::Redis(redis) => { @@ -511,7 +514,28 @@ pub fn convert_runtime(_c: &mut TypegraphContext, runtime: Runtime) -> Result { + substantial::WorkflowKind::Python + } + crate::wit::runtimes::WorkflowKind::Deno => { + substantial::WorkflowKind::Deno + } + }, + deps: desc.deps.clone().into_iter().map(PathBuf::from).collect(), + imports: desc.workflows, + }) + .collect(), + })) + .into()) } Runtime::Kv(d) => Ok(TGRuntime::Known(Rt::Kv(KvRuntimeData { url: d.url.clone() })).into()), Runtime::Grpc(d) => Ok(TGRuntime::Known(Rt::Grpc(GrpcRuntimeData { diff --git a/src/typegraph/core/src/runtimes/substantial.rs b/src/typegraph/core/src/runtimes/substantial.rs index 30f2d7a9c8..5ab2f6a947 100644 --- a/src/typegraph/core/src/runtimes/substantial.rs +++ b/src/typegraph/core/src/runtimes/substantial.rs @@ -1,30 +1,30 @@ // Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0. // SPDX-License-Identifier: MPL-2.0 -use std::path::PathBuf; - +use crate::conversion::runtimes::MaterializerConverter; +use crate::errors::Result; use crate::global_store::Store; -use crate::t::{self, TypeBuilder}; +use crate::t::{self, ConcreteTypeBuilder, TypeBuilder}; +use crate::typegraph::TypegraphContext; use crate::wit::core::FuncParams; use crate::wit::{ - self, core::RuntimeId, runtimes::Effect as WitEffect, runtimes::SubstantialOperationData, + core::RuntimeId, runtimes::Effect as WitEffect, runtimes::SubstantialOperationData, runtimes::SubstantialOperationType, }; -use crate::{ - conversion::runtimes::MaterializerConverter, errors::Result, typegraph::TypegraphContext, -}; -use common::typegraph::runtimes::substantial::WorkflowMatData; use common::typegraph::Materializer; - use serde_json::json; #[derive(Debug)] pub enum SubstantialMaterializer { - Start { workflow: WorkflowMatData }, - Stop { workflow: WorkflowMatData }, - Send { workflow: WorkflowMatData }, - Resources { workflow: WorkflowMatData }, - Results { workflow: WorkflowMatData }, + Start, + StartRaw, + Stop, + Send, + SendRaw, + Resources, + Results, + ResultsRaw, + InternalLinkParentChild, } impl MaterializerConverter for SubstantialMaterializer { @@ -35,36 +35,18 @@ impl MaterializerConverter for SubstantialMaterializer { effect: WitEffect, ) -> Result { let runtime = c.register_runtime(runtime_id)?; - let as_index_map = |wf_data: &WorkflowMatData| { - let WorkflowMatData { - name, - file, - kind, - deps, - } = wf_data; - json!({ - "name": name, - "file": file, - "kind": kind, - "deps": deps, - }) - }; let (name, data) = match self { - SubstantialMaterializer::Start { workflow } => { - ("start".to_string(), as_index_map(workflow)) - } - SubstantialMaterializer::Stop { workflow } => { - ("stop".to_string(), as_index_map(workflow)) - } - SubstantialMaterializer::Send { workflow } => { - ("send".to_string(), as_index_map(workflow)) - } - SubstantialMaterializer::Resources { workflow } => { - ("resources".to_string(), as_index_map(workflow)) - } - SubstantialMaterializer::Results { workflow } => { - ("results".to_string(), as_index_map(workflow)) + SubstantialMaterializer::Start => ("start".to_string(), json!({})), + SubstantialMaterializer::StartRaw => ("start_raw".to_string(), json!({})), + SubstantialMaterializer::Stop => ("stop".to_string(), json!({})), + SubstantialMaterializer::Send => ("send".to_string(), json!({})), + SubstantialMaterializer::SendRaw => ("send_raw".to_string(), json!({})), + SubstantialMaterializer::Resources => ("resources".to_string(), json!({})), + SubstantialMaterializer::Results => ("results".to_string(), json!({})), + SubstantialMaterializer::ResultsRaw => ("results_raw".to_string(), json!({})), + SubstantialMaterializer::InternalLinkParentChild => { + ("internal_link_parent_child".to_string(), json!({})) } }; @@ -83,40 +65,53 @@ pub fn substantial_operation( ) -> Result { let mut inp = t::struct_(); let (effect, mat_data, out_ty) = match data.operation { - SubstantialOperationType::Start(workflow) => { - let arg = data.func_arg.ok_or("query arg is undefined".to_string())?; - inp.prop("kwargs", arg.into()); - ( - WitEffect::Create(false), - SubstantialMaterializer::Start { - workflow: workflow.into(), - }, - t::string().build()?, - ) + SubstantialOperationType::Start | SubstantialOperationType::StartRaw => { + let (mat, flag) = match data.operation { + SubstantialOperationType::Start => (SubstantialMaterializer::Start, true), + SubstantialOperationType::StartRaw => (SubstantialMaterializer::StartRaw, false), + _ => unreachable!(), + }; + + inp.prop("name", t::string().build()?); + inp.prop( + "kwargs", + use_arg_or_json_string(data.func_arg, flag)?.into(), + ); + + (WitEffect::Create(true), mat, t::string().build()?) } - SubstantialOperationType::Stop(workflow) => { + SubstantialOperationType::Stop => { inp.prop("run_id", t::string().build()?); + ( WitEffect::Create(false), - SubstantialMaterializer::Stop { - workflow: workflow.into(), - }, - t::string().build()?, + SubstantialMaterializer::Stop, + t::list(t::string().build()?).build()?, ) } - SubstantialOperationType::Send(workflow) => { - let arg = data.func_arg.ok_or("query arg is undefined".to_string())?; + SubstantialOperationType::Send | SubstantialOperationType::SendRaw => { + let (mat, flag) = match data.operation { + SubstantialOperationType::Send => (SubstantialMaterializer::Send, true), + SubstantialOperationType::SendRaw => (SubstantialMaterializer::SendRaw, false), + _ => unreachable!(), + }; + + let event = t::struct_() + .prop("name", t::string().build()?) + .prop( + "payload", + use_arg_or_json_string(data.func_arg, flag)?.into(), + ) + .build()?; + inp.prop("run_id", t::string().build()?); - inp.prop("event", arg.into()); - ( - WitEffect::Create(false), - SubstantialMaterializer::Send { - workflow: workflow.into(), - }, - t::string().build()?, - ) + inp.prop("event", event); + + (WitEffect::Create(false), mat, t::string().build()?) } - SubstantialOperationType::Resources(workflow) => { + SubstantialOperationType::Resources => { + inp.prop("name", t::string().build()?); + let row = t::struct_() .prop("run_id", t::string().build()?) .prop("started_at", t::string().build()?) @@ -131,18 +126,19 @@ pub fn substantial_operation( .prop("running", t::list(row).build()?) .build()?; - ( - WitEffect::Read, - SubstantialMaterializer::Resources { - workflow: workflow.into(), - }, - out, - ) + (WitEffect::Read, SubstantialMaterializer::Resources, out) } - SubstantialOperationType::Results(workflow) => { - let out = data - .func_out - .ok_or("query output is undefined".to_string())?; + SubstantialOperationType::Results | SubstantialOperationType::ResultsRaw => { + let (mat, flag) = match data.operation { + SubstantialOperationType::Results => (SubstantialMaterializer::Results, true), + SubstantialOperationType::ResultsRaw => { + (SubstantialMaterializer::ResultsRaw, false) + } + _ => unreachable!(), + }; + + inp.prop("name", t::string().build()?); + let out = use_arg_or_json_string(data.func_out, flag)?; let count = t::integer().build()?; @@ -171,9 +167,7 @@ pub fn substantial_operation( ( WitEffect::Read, - SubstantialMaterializer::Results { - workflow: workflow.into(), - }, + mat, t::struct_() .prop( "ongoing", @@ -192,6 +186,16 @@ pub fn substantial_operation( .build()?, ) } + SubstantialOperationType::InternalLinkParentChild => { + inp.prop("parent_run_id", t::string().build()?); + inp.prop("child_run_id", t::string().build()?); + + ( + WitEffect::Create(true), + SubstantialMaterializer::InternalLinkParentChild, + t::boolean().build()?, + ) + } }; let mat = super::Materializer::substantial(runtime, mat_data, effect); @@ -203,18 +207,17 @@ pub fn substantial_operation( }) } -impl From for WorkflowMatData { - fn from(value: wit::runtimes::Workflow) -> Self { - use common::typegraph::runtimes::substantial; - - Self { - name: value.name, - file: PathBuf::from(value.file), - kind: match value.kind { - wit::runtimes::WorkflowKind::Python => substantial::WorkflowKind::Python, - wit::runtimes::WorkflowKind::Deno => substantial::WorkflowKind::Deno, - }, - deps: value.deps.iter().map(PathBuf::from).collect(), - } - } +fn use_arg_or_json_string(arg: Option, flag: bool) -> Result { + if flag { + let arg = arg.ok_or("input or output shape is not defined on the typegraph".to_string())?; + return Ok(arg); + }; + + t::string() + .config( + "format", + serde_json::to_string("json").map_err(|e| e.to_string())?, + ) + .build() + .map(|r| r.into()) } diff --git a/src/typegraph/core/src/utils/postprocess/substantial_rt.rs b/src/typegraph/core/src/utils/postprocess/substantial_rt.rs index 2dfaf902f6..258b588278 100644 --- a/src/typegraph/core/src/utils/postprocess/substantial_rt.rs +++ b/src/typegraph/core/src/utils/postprocess/substantial_rt.rs @@ -3,8 +3,7 @@ use crate::utils::{artifacts::ArtifactsExt, fs::FsContext, postprocess::PostProcessor}; use common::typegraph::{ - runtimes::substantial::WorkflowMatData, - utils::{map_from_object, object_from_map}, + runtimes::{self, TGRuntime}, Typegraph, }; use std::path::PathBuf; @@ -22,31 +21,30 @@ impl SubstantialProcessor { impl PostProcessor for SubstantialProcessor { fn postprocess(self, tg: &mut Typegraph) -> Result<(), crate::errors::TgError> { let fs_ctx = FsContext::new(self.typegraph_dir.clone()); - let mut materializers = std::mem::take(&mut tg.materializers); - let has_workflow_def = &["start", "stop", "send", "resources", "results"]; - for mat in materializers.iter_mut() { - if has_workflow_def.contains(&mat.name.as_str()) { - let mat_data = std::mem::take(&mut mat.data); - - let mut mat_data: WorkflowMatData = - object_from_map(mat_data).map_err(|e| e.to_string())?; - - fs_ctx.register_artifact(mat_data.file.clone(), tg)?; - - let deps = std::mem::take(&mut mat_data.deps); - for artifact in deps.into_iter() { - let artifacts = fs_ctx.list_files(&[artifact.to_string_lossy().to_string()]); - for artifact in artifacts.iter() { - fs_ctx.register_artifact(artifact.clone(), tg)?; + let runtimes = std::mem::take(&mut tg.runtimes); + + for runtime in runtimes.iter() { + if let TGRuntime::Known(known_runtime) = runtime { + match known_runtime { + runtimes::KnownRuntime::Substantial(data) => { + for wf_description in &data.workflows { + fs_ctx.register_artifact(wf_description.file.clone(), tg)?; + + for artifact in &wf_description.deps { + let artifacts: Vec = + fs_ctx.list_files(&[artifact.to_string_lossy().to_string()]); + for artifact in artifacts.iter() { + fs_ctx.register_artifact(artifact.clone(), tg)?; + } + } + } } - mat_data.deps.extend(artifacts); + _ => continue, } - - mat.data = map_from_object(mat_data).map_err(|e| e.to_string())?; } } - tg.materializers = materializers; + tg.runtimes = runtimes; Ok(()) } } diff --git a/src/typegraph/core/wit/typegraph.wit b/src/typegraph/core/wit/typegraph.wit index d728350dc2..2e86803a05 100644 --- a/src/typegraph/core/wit/typegraph.wit +++ b/src/typegraph/core/wit/typegraph.wit @@ -498,28 +498,34 @@ interface runtimes { redis(redis-backend) } - record substantial-runtime-data { - backend: substantial-backend - } enum workflow-kind { python, deno } - record workflow { - name: string, + record workflow-file-description { + workflows: list, file: string, deps: list, kind: workflow-kind } + record substantial-runtime-data { + backend: substantial-backend, + file-descriptions: list + } + variant substantial-operation-type { - start(workflow), - stop(workflow), - send(workflow), - resources(workflow), - results(workflow), + start, + start-raw, + stop, + send, + send-raw, + resources, + results, + results-raw, + internal-link-parent-child } record substantial-operation-data { diff --git a/src/typegraph/deno/src/runtimes/substantial.ts b/src/typegraph/deno/src/runtimes/substantial.ts index dd11e51a38..4ad4145d6f 100644 --- a/src/typegraph/deno/src/runtimes/substantial.ts +++ b/src/typegraph/deno/src/runtimes/substantial.ts @@ -8,9 +8,9 @@ import type { SubstantialBackend, SubstantialOperationData, SubstantialOperationType, - Workflow, + WorkflowFileDescription, + WorkflowKind, } from "../gen/typegraph_core.d.ts"; -import { t } from "../index.ts"; export class Backend { static devMemory(): SubstantialBackend { @@ -34,9 +34,13 @@ export class Backend { export class SubstantialRuntime extends Runtime { backend: SubstantialBackend; - constructor(backend: SubstantialBackend) { + constructor( + backend: SubstantialBackend, + fileDescriptions: Array + ) { const id = runtimes.registerSubstantialRuntime({ backend, + fileDescriptions, }); super(id); this.backend = backend; @@ -56,68 +60,109 @@ export class SubstantialRuntime extends Runtime { return Func.fromTypeFunc(funcData); } - deno(file: string, name: string, deps: Array = []): WorkflowHandle { - return new WorkflowHandle(this, { file, name, deps, kind: "deno" }); - } - - python(file: string, name: string, deps: Array = []): WorkflowHandle { - return new WorkflowHandle(this, { file, name, deps, kind: "python" }); - } -} - -export class WorkflowHandle { - constructor(private sub: SubstantialRuntime, private workflow: Workflow) {} - start(kwargs: Typedef): Func { - return this.sub._genericSubstantialFunc( + return this._genericSubstantialFunc( { tag: "start", - val: this.workflow!, }, kwargs ); } - stop(): Func { - return this.sub._genericSubstantialFunc({ - tag: "stop", - val: this.workflow!, + startRaw(): Func { + return this._genericSubstantialFunc({ + tag: "start-raw", }); } - send( - payload: Typedef, - eventName?: string - ): Func { - const event = t.struct({ - name: eventName ? t.string().set(eventName) : t.string(), - payload, + stop(): Func { + return this._genericSubstantialFunc({ + tag: "stop", }); + } - return this.sub._genericSubstantialFunc( + send(payload: Typedef): Func { + return this._genericSubstantialFunc( { tag: "send", - val: this.workflow!, }, - event + payload ); } + sendRaw(): Func { + return this._genericSubstantialFunc({ + tag: "send-raw", + }); + } + queryResources(): Func { - return this.sub._genericSubstantialFunc({ + return this._genericSubstantialFunc({ tag: "resources", - val: this.workflow!, }); } queryResults(output: Typedef): Func { - return this.sub._genericSubstantialFunc( + return this._genericSubstantialFunc( { tag: "results", - val: this.workflow!, }, undefined, output ); } + + queryResultsRaw(): Func { + return this._genericSubstantialFunc({ + tag: "results-raw", + }); + } + + #internalLinkParentChild(): Func { + return this._genericSubstantialFunc({ + tag: "internal-link-parent-child", + }); + } + + internals(): Record> { + return { + _sub_internal_start: this.startRaw(), + _sub_internal_stop: this.stop(), + _sub_internal_send: this.sendRaw(), + _sub_internal_results: this.queryResultsRaw(), + _sub_internal_link_parent_child: this.#internalLinkParentChild(), + }; + } +} + +export class WorkflowFile { + private workflows: Array = []; + + private constructor( + public readonly file: string, + public readonly kind: WorkflowKind, + public deps: Array = [] + ) {} + + static deno(file: string, deps: Array = []): WorkflowFile { + return new WorkflowFile(file, "deno", deps); + } + + static python(file: string, deps: Array = []): WorkflowFile { + return new WorkflowFile(file, "python", deps); + } + + import(names: Array): WorkflowFile { + this.workflows.push(...names); + return this; + } + + build(): WorkflowFileDescription { + return { + deps: this.deps, + file: this.file, + kind: this.kind, + workflows: this.workflows, + }; + } } diff --git a/src/typegraph/python/typegraph/runtimes/substantial.py b/src/typegraph/python/typegraph/runtimes/substantial.py index 14fdfc48f3..836d35ccdd 100644 --- a/src/typegraph/python/typegraph/runtimes/substantial.py +++ b/src/typegraph/python/typegraph/runtimes/substantial.py @@ -12,12 +12,16 @@ SubstantialOperationData, SubstantialOperationType, SubstantialOperationTypeSend, + SubstantialOperationTypeSendRaw, SubstantialOperationTypeStart, + SubstantialOperationTypeStartRaw, SubstantialOperationTypeStop, SubstantialOperationTypeResources, SubstantialOperationTypeResults, + SubstantialOperationTypeResultsRaw, + SubstantialOperationTypeInternalLinkParentChild, SubstantialRuntimeData, - Workflow, + WorkflowFileDescription, WorkflowKind, ) from typegraph.gen.types import Err @@ -37,8 +41,12 @@ def redis(connection_string_secret: str): class SubstantialRuntime(Runtime): - def __init__(self, backend: SubstantialBackend): - data = SubstantialRuntimeData(backend) + def __init__( + self, + backend: SubstantialBackend, + file_descriptions: List[WorkflowFileDescription], + ): + data = SubstantialRuntimeData(backend, file_descriptions) super().__init__(runtimes.register_substantial_runtime(store, data)) self.backend = backend @@ -60,58 +68,70 @@ def _generic_substantial_func( return t.func.from_type_func(func_data.value) - def deno( - self, - *, - file: str, - name: str, - deps: List[str] = [], - ): - return WorkflowHandle( - self, Workflow(name=name, file=file, deps=deps, kind=WorkflowKind.DENO) - ) - - def python( - self, - *, - file: str, - name: str, - deps: List[str] = [], - ): - return WorkflowHandle( - self, Workflow(name=name, file=file, deps=deps, kind=WorkflowKind.PYTHON) - ) + def start(self, kwargs: "t.struct"): + operation = SubstantialOperationTypeStart() + return self._generic_substantial_func(operation, kwargs, None) + def start_raw(self): + operation = SubstantialOperationTypeStartRaw() + return self._generic_substantial_func(operation, None, None) -class WorkflowHandle: - def __init__(self, sub: SubstantialRuntime, workflow: Workflow): - self.sub = sub - self.workflow = workflow + def stop(self): + operation = SubstantialOperationTypeStop() + return self._generic_substantial_func(operation, None, None) - def start(self, kwargs: "t.struct"): - operation = SubstantialOperationTypeStart(self.workflow) - return self.sub._generic_substantial_func(operation, kwargs, None) + def send(self, payload: "t.typedef"): + operation = SubstantialOperationTypeSend() + return self._generic_substantial_func(operation, payload, None) - def stop(self): - operation = SubstantialOperationTypeStop(self.workflow) - return self.sub._generic_substantial_func(operation, None, None) - - def send(self, payload: "t.typedef", event_name=Union[str, None]): - operation = SubstantialOperationTypeSend(self.workflow) - event = t.struct( - { - "name": t.string() - if event_name is None - else t.string().set(event_name), - "payload": payload, - } - ) - return self.sub._generic_substantial_func(operation, event, None) + def send_raw(self): + operation = SubstantialOperationTypeSendRaw() + return self._generic_substantial_func(operation, None, None) def query_resources(self): - operation = SubstantialOperationTypeResources(self.workflow) - return self.sub._generic_substantial_func(operation, None, None) + operation = SubstantialOperationTypeResources() + return self._generic_substantial_func(operation, None, None) def query_results(self, output: "t.typedef"): - operation = SubstantialOperationTypeResults(self.workflow) - return self.sub._generic_substantial_func(operation, None, output) + operation = SubstantialOperationTypeResults() + return self._generic_substantial_func(operation, None, output) + + def query_results_raw(self): + operation = SubstantialOperationTypeResultsRaw() + return self._generic_substantial_func(operation, None, None) + + def _internal_link_parent_child(self): + operation = SubstantialOperationTypeInternalLinkParentChild() + return self._generic_substantial_func(operation, None, None) + + def internals(self): + return { + "_sub_internal_start": self.start_raw(), + "_sub_internal_stop": self.stop(), + "_sub_internal_send": self.send_raw(), + "_sub_internal_results": self.query_results_raw(), + "_sub_internal_link_parent_child": self._internal_link_parent_child(), + } + + +class WorkflowFile: + def __init__(self, file: str, kind: WorkflowKind, deps: List[str] = []): + self.file = file + self.kind = kind + self.deps = deps + self.workflows: List[str] = [] + + def deno(*, file: str, deps: List[str] = []): + return WorkflowFile(file, WorkflowKind.DENO, deps) + + def python(*, file: str, deps: List[str] = []): + return WorkflowFile(file, WorkflowKind.PYTHON, deps) + + def import_(self, names: List[str]): + self.workflows += names + return self + + def build(self): + return WorkflowFileDescription( + workflows=self.workflows, deps=self.deps, file=self.file, kind=self.kind + ) diff --git a/tests/runtimes/substantial/child_workflow.ts b/tests/runtimes/substantial/child_workflow.ts new file mode 100644 index 0000000000..904d2b9b90 --- /dev/null +++ b/tests/runtimes/substantial/child_workflow.ts @@ -0,0 +1,63 @@ +import { Context } from "./imports/common_types.ts"; + +function apply(pkg: string, oldVersion: string, newVersion: string) { + console.info( + `Updating ${pkg} v${oldVersion} => ${pkg} v${newVersion}: applied` + ); +} + +export async function bumpPackage(ctx: Context) { + const { name, version } = ctx.kwargs; + const newVersion = await ctx.save(() => version + 1); + await ctx.save(() => apply(name, version, newVersion)); + + ctx.sleep(5000); + + return `Bump ${name} v${version} => v${newVersion}`; +} + +export async function bumpAll(ctx: Context) { + const { packages } = ctx.kwargs; + + // step 1: always save when starting a child as it produces effects + // (preferably per start, which is not the case here) + const handlersDef = await ctx.save(async () => { + const handlersDef = []; + for (const { name, version } of packages) { + const handleDef = await ctx.startChildWorkflow(bumpPackage, { + name, + version, + }); + + handlersDef.push(handleDef); + } + + return handlersDef; + }); + + // step 2: create a workflow handle using the data generated from startChildWorkflow + const handles = handlersDef.map((def) => ctx.createWorkflowHandle(def)); + + // step 3: use ensure to block until conditional function call is truthy + // The workflow is not required to wait on its children but in our case we want to retrieve all children outputs + await ctx.ensure(async () => { + for (const handle of handles) { + if (!(await handle.hasStopped())) { + return false; + } + } + return true; + }); + + const ret = await ctx.save(async () => { + const ret = []; + for (const handle of handles) { + const childResult = await handle.result(); + ret.push(childResult); + } + + return ret; + }); + + return ret.sort((a, b) => a.localeCompare(b)); +} diff --git a/tests/runtimes/substantial/common.ts b/tests/runtimes/substantial/common.ts index 707d6834ef..3cc42b1f5d 100644 --- a/tests/runtimes/substantial/common.ts +++ b/tests/runtimes/substantial/common.ts @@ -37,10 +37,13 @@ export function basicTestTemplate( ) { Meta.test( { - name: `Substantial runtime and workflow execution lifecycle (${backendName})`, + name: `Basic workflow execution lifecycle + interrupts (${backendName})`, }, async (t) => { Deno.env.set("SUB_BACKEND", backendName); + // FIXME: typegate.local not available through workers when using ctx.gql on tests? + Deno.env.set("TEST_OVERRIDE_GQL_ORIGIN", `http://localhost:${t.port}`); + cleanup && t.addCleanup(cleanup); const e = await t.engine("runtimes/substantial/substantial.py", { @@ -49,15 +52,15 @@ export function basicTestTemplate( let currentRunId: string | null = null; await t.should( - `start a workflow and return its run id (${backendName})`, + `start saveAndSleepExample workflow and return its run id (${backendName})`, async () => { await gql` mutation { - start(kwargs: { a: 10, b: 20 }) + start_sleep(kwargs: { a: 10, b: 20 }) } ` .expectBody((body) => { - currentRunId = body.data?.start! as string; + currentRunId = body.data?.start_sleep! as string; assertExists( currentRunId, "Run id was not returned when workflow was started" @@ -68,14 +71,14 @@ export function basicTestTemplate( ); // Let interrupts to do their jobs for a bit - await sleep(8 * 1000); + await sleep(10 * 1000); // including remote_add cost (about 1.5s) await t.should( - `have workflow marked as ongoing (${backendName})`, + `have saveAndSleepExample workflow marked as ongoing (${backendName})`, async () => { await gql` query { - results { + results(name: "saveAndSleepExample") { ongoing { count runs { @@ -103,44 +106,47 @@ export function basicTestTemplate( await sleep(delays.awaitSleepCompleteSec * 1000); - await t.should(`complete sleep workflow (${backendName})`, async () => { - await gql` - query { - results { - ongoing { - count - } - completed { - count - runs { - run_id - result { - status - value + await t.should( + `complete saveAndSleepExample workflow (${backendName})`, + async () => { + await gql` + query { + results(name: "saveAndSleepExample") { + ongoing { + count + } + completed { + count + runs { + run_id + result { + status + value + } } } } } - } - ` - .expectData({ - results: { - ongoing: { - count: 0, - }, - completed: { - count: 1, - runs: [ - { - run_id: currentRunId, - result: { status: "COMPLETED", value: 30 }, - }, - ], + ` + .expectData({ + results: { + ongoing: { + count: 0, + }, + completed: { + count: 1, + runs: [ + { + run_id: currentRunId, + result: { status: "COMPLETED", value: 30 }, + }, + ], + }, }, - }, - }) - .on(e); - }); + }) + .on(e); + } + ); } ); } @@ -223,7 +229,7 @@ export function concurrentWorkflowTestTemplate( event: { payload: false } ) # will abort - three: abort_email_confirmation(run_id: $three_run_id) + three: stop(run_id: $three_run_id) } ` .withVars({ @@ -234,7 +240,7 @@ export function concurrentWorkflowTestTemplate( .expectData({ one: runIds[0], two: runIds[1], - three: runIds[2], + three: [runIds[2]], }) .on(e); } @@ -250,7 +256,7 @@ export function concurrentWorkflowTestTemplate( await t.should(`complete execution (${backendName})`, async () => { await gql` query { - email_results { + results(name: "eventsAndExceptionExample") { ongoing { count } @@ -269,22 +275,22 @@ export function concurrentWorkflowTestTemplate( ` .expectBody((body) => { assertEquals( - body?.data?.email_results?.ongoing?.count, + body?.data?.results?.ongoing?.count, 0, - "0 workflow currently running" + `0 workflow currently running (${backendName})` ); assertEquals( - body?.data?.email_results?.completed?.count, + body?.data?.results?.completed?.count, 3, - "3 workflows completed" + `3 workflows completed (${backendName})` ); const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); const received = - body?.data?.email_results?.completed?.runs ?? ([] as Array); + body?.data?.results?.completed?.runs ?? ([] as Array); const expected = [ { result: { @@ -312,7 +318,7 @@ export function concurrentWorkflowTestTemplate( assertEquals( received.sort(localSorter), expected.sort(localSorter), - "All three workflows have completed, including the aborted one" + `All three workflows have completed, including the aborted one (${backendName})` ); }) .on(e); @@ -371,7 +377,7 @@ export function retrySaveTestTemplate( assertExists(resolvedId, "resolve runId"); assertExists(retryId, "retry runId"); - assertExists(timeoutId, "timeou runId"); + assertExists(timeoutId, "timeout runId"); assertExists(retryAbortMeId, "retry_abort_me runId"); }) .on(e); @@ -385,14 +391,14 @@ export function retrySaveTestTemplate( async () => { await gql` mutation { - abort_retry(run_id: $run_id) + abort_retry: stop(run_id: $run_id) } ` .withVars({ run_id: retryAbortMeId, }) .expectData({ - abort_retry: retryAbortMeId, + abort_retry: [retryAbortMeId], }) .on(e); } @@ -406,7 +412,7 @@ export function retrySaveTestTemplate( async () => { await gql` query { - retry_results { + results(name: "retryExample") { ongoing { count runs { @@ -428,23 +434,22 @@ export function retrySaveTestTemplate( ` .expectBody((body) => { assertEquals( - body?.data?.retry_results?.ongoing?.count, + body?.data?.results?.ongoing?.count, 0, - "0 workflow currently running" + `0 workflow currently running (${backendName})` ); assertEquals( - body?.data?.retry_results?.completed?.count, + body?.data?.results?.completed?.count, 4, - "4 workflows completed" + `4 workflows completed (${backendName})` ); const localSorter = (a: any, b: any) => a.run_id.localeCompare(b.run_id); const received = - body?.data?.retry_results?.completed?.runs ?? - ([] as Array); + body?.data?.results?.completed?.runs ?? ([] as Array); const expected = [ { result: { @@ -479,7 +484,7 @@ export function retrySaveTestTemplate( assertEquals( received.sort(localSorter), expected.sort(localSorter), - "All workflows have completed" + `All workflows have completed (${backendName})` ); }) .on(e); @@ -489,6 +494,126 @@ export function retrySaveTestTemplate( ); } +export function childWorkflowTestTemplate( + backendName: BackendName, + { + delays, + secrets, + }: { + delays: { + awaitCompleteSec: number; + }; + secrets?: Record; + }, + cleanup?: MetaTestCleanupFn +) { + Meta.test( + { + name: `Child workflows (${backendName})`, + }, + async (t) => { + Deno.env.set("SUB_BACKEND", backendName); + // FIXME: typegate.local not available through workers when using ctx.gql on tests? + Deno.env.set("TEST_OVERRIDE_GQL_ORIGIN", `http://localhost:${t.port}`); + + cleanup && t.addCleanup(cleanup); + + const e = await t.engine( + "runtimes/substantial/substantial_child_workflow.py", + { + secrets, + } + ); + + const packages = [ + { name: "metatype", version: 1 }, + { name: "substantial", version: 2 }, + { name: "typegraph", version: 3 }, + ]; + + let parentRunId: string | null = null; + t.should(`start parent workflow`, async () => { + await gql` + mutation { + start(kwargs: { packages: $packages }) + } + ` + .withVars({ packages }) + .expectBody((body) => { + parentRunId = body.data?.start! as string; + assertExists( + parentRunId, + "Run id was not returned when workflow was started" + ); + }) + .on(e); + }); + + await sleep(delays.awaitCompleteSec * 1000); + + t.should(`complete parent and all child workflows`, async () => { + await gql` + query { + children: results_raw(name: "bumpPackage") { + ongoing { + count + } + completed { + count + } + } + parent: results_raw(name: "bumpAll") { + ongoing { + count + } + completed { + runs { + run_id + result { + status + value + } + } + } + } + } + ` + .expectData({ + children: { + ongoing: { + count: 0, + }, + completed: { + count: packages.length, + }, + }, + parent: { + ongoing: { + count: 0, + }, + completed: { + runs: [ + { + run_id: parentRunId, + result: { + status: "COMPLETED", + value: JSON.stringify([ + "Bump metatype v1 => v2", + "Bump substantial v2 => v3", + "Bump typegraph v3 => v4", + ]), + }, + }, + ], + }, + }, + }) + .on(e); + }); + } + ); +} + // TODO: // mock a very basic http server in another process that counts the number of request made by a workflow // This will allow.. diff --git a/tests/runtimes/substantial/imports/common_types.ts b/tests/runtimes/substantial/imports/common_types.ts index bc79dbe61e..3e40785083 100644 --- a/tests/runtimes/substantial/imports/common_types.ts +++ b/tests/runtimes/substantial/imports/common_types.ts @@ -1,7 +1,32 @@ // TODO: include this as part of the metagen generated code +// TODO: +export type Workflow = (ctx: Context) => Promise; + +export interface SerializableWorkflowHandle { + runId?: string; + name: string; + kwargs: unknown; +} + +export interface ChildWorkflowHandle { + name: string; + result(): Promise; + stop: () => Promise; + start: () => Promise; + hasStopped: () => Promise; +} + export interface Context { kwargs: any; + gql: ( + query: readonly string[], + ...args: unknown[] + ) => { + run: ( + variables: Record + ) => Promise>; + }; sleep: (ms: number) => void; save(fn: () => T | Promise, option?: SaveOption): Promise; receive(eventName: string): O; @@ -9,7 +34,15 @@ export interface Context { eventName: string, fn: (received: I) => O | Promise ): Promise; - ensure(conditionFn: () => boolean | Promise): Promise; + ensure(conditionFn: () => boolean | Promise): Promise; + + startChildWorkflow( + workflow: Workflow, + kwargs: unknown + ): Promise; + createWorkflowHandle( + handleDef: SerializableWorkflowHandle + ): ChildWorkflowHandle; } export interface SaveOption { diff --git a/tests/runtimes/substantial/kv_like_test.ts b/tests/runtimes/substantial/kv_like_test.ts index eaf6046e10..33e66b7774 100644 --- a/tests/runtimes/substantial/kv_like_test.ts +++ b/tests/runtimes/substantial/kv_like_test.ts @@ -3,24 +3,40 @@ import { basicTestTemplate, + childWorkflowTestTemplate, concurrentWorkflowTestTemplate, retrySaveTestTemplate, } from "./common.ts"; -basicTestTemplate("memory", { - delays: { awaitSleepCompleteSec: 7 }, -}); - basicTestTemplate("fs", { - delays: { awaitSleepCompleteSec: 7 }, + delays: { awaitSleepCompleteSec: 12 }, }); concurrentWorkflowTestTemplate("fs", { - delays: { awaitEmailCompleteSec: 10 }, + delays: { awaitEmailCompleteSec: 12 }, }); retrySaveTestTemplate("fs", { delays: { - awaitCompleteAll: 15, + awaitCompleteAll: 17, + }, +}); + +childWorkflowTestTemplate("fs", { + delays: { + awaitCompleteSec: 20, + }, +}); + +basicTestTemplate("memory", { + delays: { awaitSleepCompleteSec: 12 }, +}); +concurrentWorkflowTestTemplate("memory", { + delays: { awaitEmailCompleteSec: 8 }, +}); + +retrySaveTestTemplate("memory", { + delays: { + awaitCompleteAll: 17, }, }); diff --git a/tests/runtimes/substantial/redis_test.ts b/tests/runtimes/substantial/redis_test.ts index 077c6c4e88..07091fc641 100644 --- a/tests/runtimes/substantial/redis_test.ts +++ b/tests/runtimes/substantial/redis_test.ts @@ -4,6 +4,7 @@ import { SUB_REDIS, basicTestTemplate, + childWorkflowTestTemplate, concurrentWorkflowTestTemplate, redisCleanup, retrySaveTestTemplate, @@ -21,7 +22,7 @@ basicTestTemplate( concurrentWorkflowTestTemplate( "redis", { - delays: { awaitEmailCompleteSec: 18 }, + delays: { awaitEmailCompleteSec: 12 }, secrets: { SUB_REDIS }, }, redisCleanup(SUB_REDIS) @@ -31,10 +32,22 @@ retrySaveTestTemplate( "redis", { delays: { - awaitCompleteAll: 18, + awaitCompleteAll: 17, }, secrets: { SUB_REDIS }, }, redisCleanup(SUB_REDIS) ); + +childWorkflowTestTemplate( + "redis", + { + delays: { + awaitCompleteSec: 20, + }, + + secrets: { SUB_REDIS }, + }, + redisCleanup(SUB_REDIS) +); diff --git a/tests/runtimes/substantial/substantial.py b/tests/runtimes/substantial/substantial.py index 919cb31f40..f079980563 100644 --- a/tests/runtimes/substantial/substantial.py +++ b/tests/runtimes/substantial/substantial.py @@ -1,7 +1,8 @@ import os from typegraph import typegraph, t, Graph from typegraph.policy import Policy -from typegraph.runtimes.substantial import SubstantialRuntime +from typegraph.runtimes.deno import DenoRuntime +from typegraph.runtimes.substantial import SubstantialRuntime, WorkflowFile from typegraph.runtimes.substantial import Backend @@ -16,45 +17,43 @@ def substantial(g: Graph): elif os.environ["SUB_BACKEND"] == "redis": backend = Backend.redis("SUB_REDIS") - sub = SubstantialRuntime(backend) - - save_and_sleep = sub.deno( - file="workflow.ts", - name="saveAndSleepExample", - deps=["imports/common_types.ts"], - ) - - email = sub.deno( - file="workflow.ts", - name="eventsAndExceptionExample", - deps=["imports/common_types.ts"], + file = ( + WorkflowFile.deno(file="workflow.ts", deps=["imports/common_types.ts"]) + .import_(["saveAndSleepExample", "eventsAndExceptionExample", "retryExample"]) + .build() ) - retry = sub.deno( - file="workflow.ts", - name="retryExample", - deps=["imports/common_types.ts"], - ) + sub = SubstantialRuntime(backend, [file]) + deno = DenoRuntime() g.expose( pub, - # sleep - start=save_and_sleep.start(t.struct({"a": t.integer(), "b": t.integer()})), - workers=save_and_sleep.query_resources(), - results=save_and_sleep.query_results( + remote_add=deno.func( + t.struct({"a": t.integer(), "b": t.integer()}), + t.integer(), + code="({a, b}) => a + b", + ), + remote_static=deno.static(t.integer(), 1234), + # common + stop=sub.stop(), + results=sub.query_results( t.either([t.integer(), t.string()]).rename("ResultOrError") ), + workers=sub.query_resources(), + # sleep + start_sleep=sub.start(t.struct({"a": t.integer(), "b": t.integer()})).reduce( + {"name": "saveAndSleepExample"} + ), # email - start_email=email.start(t.struct({"to": t.email()})), - send_confirmation=email.send(t.boolean(), event_name="confirmation"), - email_workers=email.query_resources(), - email_results=email.query_results(t.string()), - abort_email_confirmation=email.stop(), + start_email=sub.start(t.struct({"to": t.string()})).reduce( + {"name": "eventsAndExceptionExample"} + ), + send_confirmation=sub.send(t.boolean()).reduce( + {"event": {"name": "confirmation", "payload": g.inherit()}} + ), # retry - start_retry=retry.start( + start_retry=sub.start( t.struct({"fail": t.boolean(), "timeout": t.boolean()}) - ), - retry_workers=retry.query_resources(), - retry_results=retry.query_results(t.string()), - abort_retry=retry.stop(), + ).reduce({"name": "retryExample"}), + **sub.internals(), ) diff --git a/tests/runtimes/substantial/substantial_child_workflow.py b/tests/runtimes/substantial/substantial_child_workflow.py new file mode 100644 index 0000000000..560dae1491 --- /dev/null +++ b/tests/runtimes/substantial/substantial_child_workflow.py @@ -0,0 +1,39 @@ +import os +from typegraph import typegraph, t, Graph +from typegraph.policy import Policy +from typegraph.runtimes.substantial import SubstantialRuntime, WorkflowFile +from typegraph.runtimes.substantial import Backend + + +@typegraph() +def substantial_child_workflow(g: Graph): + pub = Policy.public() + + backend = Backend.dev_memory() + if "SUB_BACKEND" in os.environ: + if os.environ["SUB_BACKEND"] == "fs": + backend = Backend.dev_fs() + elif os.environ["SUB_BACKEND"] == "redis": + backend = Backend.redis("SUB_REDIS") + + file = ( + WorkflowFile.deno(file="child_workflow.ts", deps=["imports/common_types.ts"]) + .import_(["bumpPackage", "bumpAll"]) + .build() + ) + + sub = SubstantialRuntime(backend, [file]) + + package = t.struct({"name": t.string(), "version": t.integer()}).rename("Package") + + g.expose( + pub, + # common + stop=sub.stop(), + results_raw=sub.query_results_raw(), # bypass type hinting in favor of json string + workers=sub.query_resources(), + start=sub.start(t.struct({"packages": t.list(package)})).reduce( + {"name": "bumpAll"} + ), + **sub.internals(), + ) diff --git a/tests/runtimes/substantial/workflow.ts b/tests/runtimes/substantial/workflow.ts index 392b31c6a8..b4df2cb5f4 100644 --- a/tests/runtimes/substantial/workflow.ts +++ b/tests/runtimes/substantial/workflow.ts @@ -26,9 +26,28 @@ export async function saveAndSleepExample(ctx: Context) { const newB = await ctx.save(() => queryThatTakesAWhile(b as number)); // + 2s - // + ~5s (depending on the wait relaunch setting) + const sum = await ctx.save(async () => { + const remoteAdd = new Date().getTime(); + const { data } = await ctx.gql/**/ `query { remote_add(a: $a, b: $b) }`.run( + { + a: newA, + b: newB, + } + ); + const remoteAddEnd = new Date().getTime(); + console.log( + "Remote add:", + (remoteAddEnd - remoteAdd) / 1000, + ", Response:", + data + ); + + return (data as any)?.remote_add as number; + }); + + // +- ~5s ctx.sleep(5000); - return newA + newB; + return sum; } export async function retryExample(ctx: Context) {