From ca75b1dacbb6d1f15ea8a90b41c49f406140e954 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 30 Jun 2022 10:30:18 -0700 Subject: [PATCH 1/3] Initial trait-based workflow API skeleton --- workflow-api/src/lib.rs | 160 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 workflow-api/src/lib.rs diff --git a/workflow-api/src/lib.rs b/workflow-api/src/lib.rs new file mode 100644 index 000000000..8247f05dd --- /dev/null +++ b/workflow-api/src/lib.rs @@ -0,0 +1,160 @@ +use futures::future::BoxFuture; +use std::{error::Error, time::Duration}; +use temporal_sdk_core_protos::temporal::api::common::v1::Payload; + +/// Workflow authors must implement this trait to create Temporal Rust workflows +pub trait Workflow: Sized { + /// Type of the input argument to the workflow + type Input: TemporalDeserializable; + /// Type of the output of the workflow + type Output: TemporalSerializable; + /// The workflow's name + const NAME: &'static str; + + /// Called when an instance of a Workflow is first initialized. + /// + /// `input` contains the input argument to the workflow as defined by the client who requested + /// the Workflow Execution. + fn new(input: Self::Input, ctx: SafeWfContext) -> Self; + + /// Defines the actual workflow logic. The function must return a future, and this future is + /// cached and polled as updates to the workflow history are received. + /// + /// `ctx` should be used to perform various Temporal commands like starting timers and + /// activities. + fn run(&mut self, ctx: WfContext) -> BoxFuture; + + /// All signals this workflow can handle. Typically you won't implement this directly, it will + /// automatically contain all signals defined with the `#[signal]` attribute. + fn signals() -> &'static [&'static SignalDefinition] { + // TODO + &[] + } + /// All queries this workflow can handle. Typically you won't implement this directly, it will + /// automatically contain all queries defined with the `#[query]` attribute. + fn queries() -> &'static [&'static QueryDefinition] { + // TODO + &[] + } +} + +/// A workflow context which contains only information, but does not allow any commands to +/// be created. +pub struct SafeWfContext { + // TODO +} + +/// TODO: Placeholder, move from SDK +pub struct WfContext {} +impl WfContext { + pub async fn timer(&self, _: Duration) { + todo!() + } +} + +pub struct SignalDefinition { + // TODO: Could be a matching predicate + name: String, + // The handler input type must be erased here, since otherwise we couldn't store/return the + // heterogeneous collection of definition types in the workflow itself. The signal macro + // will wrap the user's function with code that performs deserialization, as well as error + // boxing. + handler: Box Result<(), Box>>, +} +pub struct QueryDefinition { + // TODO: Could be a matching predicate + name: String, + // The query macro will wrap the user's function with code that performs deserialization of + // input and serialization of output, as well as error boxing. + handler: Box Result>>, +} + +/// TODO: Placeholder, move from (and improve in) SDK +pub trait TemporalSerializable {} +impl TemporalSerializable for T {} +/// TODO: Placeholder, move from (and improve in) SDK +pub trait TemporalDeserializable {} +impl TemporalDeserializable for T {} + +#[cfg(test)] +mod tests { + use super::*; + use futures::FutureExt; + use std::{collections::HashMap, marker::PhantomData}; + + // Workflow implementation example + struct MyWorkflow { + foo: u64, + bar: HashMap, + } + + impl Workflow for MyWorkflow { + type Input = String; + type Output = u64; + const NAME: &'static str = "MyWorkflowType"; + + fn new(input: Self::Input, _ctx: SafeWfContext) -> Self { + let mut bar = HashMap::new(); + bar.insert(input, 10); + Self { foo: 0, bar } + } + + fn run(&mut self, ctx: WfContext) -> BoxFuture { + async move { + ctx.timer(Duration::from_secs(1)).await; + self.foo = 1; + self.foo + } + .boxed() + // TODO: The need to box here is slightly unfortunate, but it's either that or require + // users to depend on `async_trait` (which just hides the same thing). IMO this is the + // best option until more language features stabilize and this can go away. + } + } + + // #[workflow] miiiight be necessary here, but, ideally is not. + impl MyWorkflow { + // Attrib commented out since nonexistent for now, but that's what it'd look like. + // #[signal] + pub fn my_signal(&mut self, arg: String) { + self.bar.insert(arg, 1); + } + // #[query] + pub fn my_query(&self, arg: String) -> Option { + self.bar.get(&arg).cloned() + } + } + + // This would need to be moved into this crate and depended on by client + struct WorkflowHandle { + _d: PhantomData, + } + struct SignalError; // just a placeholder + struct QueryError; // just a placeholder + + // The signal/query macros would generate this trait and impl: + trait MyWorkflowClientExtension { + fn my_signal(&self, arg: String) -> BoxFuture>; + fn my_query(&self, arg: String) -> BoxFuture, QueryError>>; + } + impl MyWorkflowClientExtension for WorkflowHandle { + fn my_signal(&self, arg: String) -> BoxFuture> { + // Is actually something like: + // self.signal("my_signal", arg.serialize()) + todo!() + } + + fn my_query(&self, arg: String) -> BoxFuture, QueryError>> { + todo!() + } + } + + async fn client_example() { + // Now you can use the client like: + // (actually comes from client.start() or client.get_handle() etc) + let wfh = WorkflowHandle { + _d: PhantomData::, + }; + let _ = wfh.my_signal("hi!".to_string()).await; + } +} From f5533a8ba289149421eb9af01adb9e6569e85300 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 May 2023 16:22:53 -0700 Subject: [PATCH 2/3] WfExitValue --- workflow-api/Cargo.toml | 22 +++++++++++++ workflow-api/src/lib.rs | 72 ++++++++++++++++++++++++++++++++--------- 2 files changed, 78 insertions(+), 16 deletions(-) create mode 100644 workflow-api/Cargo.toml diff --git a/workflow-api/Cargo.toml b/workflow-api/Cargo.toml new file mode 100644 index 000000000..7163ffbd5 --- /dev/null +++ b/workflow-api/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "temporal-workflow-api" +version = "0.1.0" +edition = "2021" +authors = ["Spencer Judge "] +license-file = "LICENSE.txt" +description = "Temporal Rust SDK Worflow APIs" +homepage = "https://temporal.io/" +repository = "https://github.com/temporalio/sdk-core" +keywords = ["temporal", "workflow"] +categories = ["development-tools"] + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0" +futures = "0.3" + +[dependencies.temporal-sdk-core-protos] +path = "../sdk-core-protos" +version = "0.1" + diff --git a/workflow-api/src/lib.rs b/workflow-api/src/lib.rs index 8247f05dd..c5f31316b 100644 --- a/workflow-api/src/lib.rs +++ b/workflow-api/src/lib.rs @@ -1,6 +1,17 @@ +//! This needs to be its own crate so that it doesn't pull in anything that would make compiling +//! to WASM not work. I've already figured out how to do all that once before with my WASM workflows +//! hackathon + use futures::future::BoxFuture; -use std::{error::Error, time::Duration}; -use temporal_sdk_core_protos::temporal::api::common::v1::Payload; +use std::time::Duration; +use temporal_sdk_core_protos::{ + coresdk::workflow_commands::ContinueAsNewWorkflowExecution, temporal::api::common::v1::Payload, +}; + +// anyhow errors are used for the errors returned by user-defined functions. This makes `?` work +// well everywhere by default which is a very nice property, as well as preserving backtraces. We +// may need to define our own error type instead to allow attaching things like the non-retryable +// flag... but I suspect we can just make downcasting work for that. /// Workflow authors must implement this trait to create Temporal Rust workflows pub trait Workflow: Sized { @@ -22,7 +33,10 @@ pub trait Workflow: Sized { /// /// `ctx` should be used to perform various Temporal commands like starting timers and /// activities. - fn run(&mut self, ctx: WfContext) -> BoxFuture; + fn run( + &mut self, + ctx: WfContext, + ) -> BoxFuture, anyhow::Error>>; /// All signals this workflow can handle. Typically you won't implement this directly, it will /// automatically contain all signals defined with the `#[signal]` attribute. @@ -38,13 +52,30 @@ pub trait Workflow: Sized { } } +/// TODO: Exists in SDK in slightly different form, and would move into this crate +#[derive(Debug)] +pub enum WfExitValue { + /// Continue the workflow as a new execution + ContinueAsNew(Box), // Wouldn't be raw proto in reality + /// Confirm the workflow was cancelled + Cancelled, + /// Finish with a result + Normal(T), +} +impl From for WfExitValue { + fn from(v: T) -> Self { + Self::Normal(v) + } +} +// ... also convenience functions for constructing C-A-N, etc. + /// A workflow context which contains only information, but does not allow any commands to /// be created. pub struct SafeWfContext { // TODO } -/// TODO: Placeholder, move from SDK +/// TODO: Placeholder, exists in SDK and would move into this crate & (likely) become a trait pub struct WfContext {} impl WfContext { pub async fn timer(&self, _: Duration) { @@ -53,26 +84,25 @@ impl WfContext { } pub struct SignalDefinition { - // TODO: Could be a matching predicate + // TODO: Could be a matching predicate, to allow for dynamic registration name: String, // The handler input type must be erased here, since otherwise we couldn't store/return the // heterogeneous collection of definition types in the workflow itself. The signal macro - // will wrap the user's function with code that performs deserialization, as well as error - // boxing. - handler: Box Result<(), Box>>, + // will wrap the user's function with code that performs deserialization. + handler: Box Result<(), anyhow::Error>>, } pub struct QueryDefinition { - // TODO: Could be a matching predicate + // TODO: Could be a matching predicate, to allow for dynamic registration name: String, // The query macro will wrap the user's function with code that performs deserialization of // input and serialization of output, as well as error boxing. - handler: Box Result>>, + handler: Box Result>, } -/// TODO: Placeholder, move from (and improve in) SDK +/// TODO: Placeholders, likely belong inside protos crate. These will be auto-implemented for +/// anything using serde already (which I expect is how virtually everyone will do this). pub trait TemporalSerializable {} impl TemporalSerializable for T {} -/// TODO: Placeholder, move from (and improve in) SDK pub trait TemporalDeserializable {} impl TemporalDeserializable for T {} @@ -99,11 +129,18 @@ mod tests { Self { foo: 0, bar } } - fn run(&mut self, ctx: WfContext) -> BoxFuture { + fn run( + &mut self, + ctx: WfContext, + ) -> BoxFuture, anyhow::Error>> { async move { ctx.timer(Duration::from_secs(1)).await; self.foo = 1; - self.foo + // The into() is unfortunately unavoidable without making C-A-N and confirm cancel + // be errors instead. Personally, I don't love that and I think it's not idiomatic + // Rust, whereas needing to `into()` something is. Other way would be macros, but + // it's slightly too much magic I think. + Ok(self.foo.into()) } .boxed() // TODO: The need to box here is slightly unfortunate, but it's either that or require @@ -114,7 +151,7 @@ mod tests { // #[workflow] miiiight be necessary here, but, ideally is not. impl MyWorkflow { - // Attrib commented out since nonexistent for now, but that's what it'd look like. + // Attrib commented out since it's nonexistent for now, but that's what it'd look like. // #[signal] pub fn my_signal(&mut self, arg: String) { self.bar.insert(arg, 1); @@ -139,7 +176,7 @@ mod tests { } impl MyWorkflowClientExtension for WorkflowHandle { fn my_signal(&self, arg: String) -> BoxFuture> { - // Is actually something like: + // Becomes something like: // self.signal("my_signal", arg.serialize()) todo!() } @@ -157,4 +194,7 @@ mod tests { }; let _ = wfh.my_signal("hi!".to_string()).await; } + + #[test] + fn compile() {} } From a36ab43cc1847730123bd866de5111f49d75cf34 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 12 May 2023 10:11:40 -0700 Subject: [PATCH 3/3] Activity definition --- sdk/src/lib.rs | 1 + sdk/src/new_activity_defs.rs | 0 workflow-api/src/activity_definitions.rs | 43 ++++++++++++++++++++++++ workflow-api/src/lib.rs | 5 +++ 4 files changed, 49 insertions(+) create mode 100644 sdk/src/new_activity_defs.rs create mode 100644 workflow-api/src/activity_definitions.rs diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index ccf8b06b8..a0f49ae41 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -47,6 +47,7 @@ extern crate tracing; mod activity_context; mod app_data; pub mod interceptors; +pub mod new_activity_defs; mod payload_converter; mod workflow_context; mod workflow_future; diff --git a/sdk/src/new_activity_defs.rs b/sdk/src/new_activity_defs.rs new file mode 100644 index 000000000..e69de29bb diff --git a/workflow-api/src/activity_definitions.rs b/workflow-api/src/activity_definitions.rs new file mode 100644 index 000000000..3d7e6936b --- /dev/null +++ b/workflow-api/src/activity_definitions.rs @@ -0,0 +1,43 @@ +use crate::WfContext; +use futures::future::BoxFuture; + +// #[activity_definition] +type MyActFn = fn(String) -> String; + +// Macro enforces types are serializable. + +// The biggest problem with activity definitions is they need to be defined in a crate which doesn't +// depend on the entire SDK, because then the workflow code which uses them wouldn't be able to +// be compiled down to WASM. Of course, the issue is activities _aren't_ compiled to WASM, and need +// access to full native functionality. Thus users need to structure their app a bit oddly. They +// can either define all their workflow code & activity _definitions_ in one crate, and then +// depend on that crate from another crate containing their activity implementations / worker, or +// they could make a crate with *just* activity definitions, which is depended on by the workflow +// implementation crate and the worker crate independently. It all makes perfect sense, but is +// maybe a bit annoying in terms of setup - though not really any worse than TS. + +// Macro generates this extension & implementation: +// +// The generated code taking `impl Into` is quite nice for ergonomics inside the workflow, +// but might be impossible in some cases, so probably macro would need a flag to turn it off. +pub trait MyActFnWfCtxExt { + // In reality this returns the `CancellableFuture` type from SDK, would also need to move into + // this crate. + fn my_act_fn( + &self, + input: impl Into, + ) -> BoxFuture<'static, Result>; +} +impl MyActFnWfCtxExt for WfContext { + fn my_act_fn(&self, _: impl Into) -> BoxFuture<'static, Result> { + // Type name is injected in this implementation, taken from macro + todo!() + } +} + +// To implement the activity in their implementation crate, the user would do something like: +// worker.register_activity(MyActFn, |input: String| async move { .... }); + +// Placeholder. Activity failures as can be seen by the WF code. +#[derive(Debug)] +pub struct ActivityFail {} diff --git a/workflow-api/src/lib.rs b/workflow-api/src/lib.rs index c5f31316b..58dc3d7b5 100644 --- a/workflow-api/src/lib.rs +++ b/workflow-api/src/lib.rs @@ -2,6 +2,9 @@ //! to WASM not work. I've already figured out how to do all that once before with my WASM workflows //! hackathon +mod activity_definitions; + +use activity_definitions::MyActFnWfCtxExt; use futures::future::BoxFuture; use std::time::Duration; use temporal_sdk_core_protos::{ @@ -136,6 +139,8 @@ mod tests { async move { ctx.timer(Duration::from_secs(1)).await; self.foo = 1; + // See activity definitions file + ctx.my_act_fn("Hi!").await.unwrap(); // The into() is unfortunately unavoidable without making C-A-N and confirm cancel // be errors instead. Personally, I don't love that and I think it's not idiomatic // Rust, whereas needing to `into()` something is. Other way would be macros, but