Skip to content

Updated workflow function to return Payload instead of empty tuple #542

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ You can buld and test the project using cargo:
Run integ tests with `cargo integ-test`. You will need to already be running the server:
`docker-compose -f .buildkite/docker/docker-compose.yaml up`

Run load tests with `cargo test --features=save_wf_inputs --test heavy_tests`.

## Formatting
To format all code run:
`cargo fmt --all`
Expand Down
10 changes: 6 additions & 4 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use std::{
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{
ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowResult,
ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowFunction,
WorkflowResult,
};
use temporal_sdk_core_api::{
errors::{PollActivityError, PollWfError},
Expand Down Expand Up @@ -964,17 +965,18 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
mh.num_expected_completions = Some(0.into());
let mut worker = mock_sdk_cfg(mh, |w| w.max_cached_workflows = 1);

#[allow(unreachable_code)]
worker.register_wf(
DEFAULT_WORKFLOW_TYPE.to_owned(),
|ctx: WfContext| async move {
WorkflowFunction::new::<_, _, ()>(|ctx: WfContext| async move {
ctx.local_activity(LocalActivityOptions {
activity_type: "echo".to_string(),
input: "hi".as_json_payload().expect("serializes fine"),
..Default::default()
})
.await;
panic!("Oh nooooo")
},
panic!()
}),
);
worker.register_activity(
"echo",
Expand Down
4 changes: 2 additions & 2 deletions core/src/worker/workflow/managed_run/managed_wf_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub struct ManagedWFFunc {
activation_tx: UnboundedSender<WorkflowActivation>,
completions_rx: UnboundedReceiver<WorkflowActivationCompletion>,
completions_sync_tx: crossbeam::channel::Sender<WorkflowActivationCompletion>,
future_handle: Option<JoinHandle<WorkflowResult<()>>>,
future_handle: Option<JoinHandle<WorkflowResult<Payload>>>,
was_shutdown: bool,
}

Expand Down Expand Up @@ -175,7 +175,7 @@ impl ManagedWFFunc {
Ok(last_act)
}

pub async fn shutdown(&mut self) -> WorkflowResult<()> {
pub async fn shutdown(&mut self) -> WorkflowResult<Payload> {
self.was_shutdown = true;
// Send an eviction to ensure wf exits if it has not finished (ex: feeding partial hist)
let _ = self.activation_tx.send(create_evict_activation(
Expand Down
41 changes: 31 additions & 10 deletions sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use crate::{
use anyhow::{anyhow, bail, Context};
use app_data::AppData;
use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
use serde::Serialize;
use std::{
any::{Any, TypeId},
cell::RefCell,
Expand Down Expand Up @@ -147,7 +148,7 @@ struct WorkflowData {
activation_chan: UnboundedSender<WorkflowActivation>,
}

struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<()>, JoinError>>> {
struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<Payload>, JoinError>>> {
join_handle: F,
run_id: String,
}
Expand Down Expand Up @@ -193,10 +194,10 @@ impl Worker {

/// Register a Workflow function to invoke when the Worker is asked to run a workflow of
/// `workflow_type`
pub fn register_wf<F: Into<WorkflowFunction>>(
pub fn register_wf(
&mut self,
workflow_type: impl Into<String>,
wf_function: F,
wf_function: impl Into<WorkflowFunction>,
) {
self.workflow_half
.workflow_fns
Expand Down Expand Up @@ -383,7 +384,9 @@ impl WorkflowHalf {
activation: WorkflowActivation,
completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
) -> Result<
Option<WorkflowFutureHandle<impl Future<Output = Result<WorkflowResult<()>, JoinError>>>>,
Option<
WorkflowFutureHandle<impl Future<Output = Result<WorkflowResult<Payload>, JoinError>>>,
>,
anyhow::Error,
> {
let mut res = None;
Expand Down Expand Up @@ -694,17 +697,21 @@ struct CommandSubscribeChildWorkflowCompletion {
unblocker: oneshot::Sender<UnblockEvent>,
}

type WfFunc = dyn Fn(WfContext) -> BoxFuture<'static, WorkflowResult<()>> + Send + Sync + 'static;
type WfFunc = dyn Fn(WfContext) -> BoxFuture<'static, Result<WfExitValue<Payload>, anyhow::Error>>
+ Send
+ Sync
+ 'static;

/// The user's async function / workflow code
pub struct WorkflowFunction {
wf_func: Box<WfFunc>,
}

impl<F, Fut> From<F> for WorkflowFunction
impl<F, Fut, O> From<F> for WorkflowFunction
where
F: Fn(WfContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = WorkflowResult<()>> + Send + 'static,
Fut: Future<Output = Result<WfExitValue<O>, anyhow::Error>> + Send + 'static,
O: Serialize + Debug,
{
fn from(wf_func: F) -> Self {
Self::new(wf_func)
Expand All @@ -713,13 +720,27 @@ where

impl WorkflowFunction {
/// Build a workflow function from a closure or function pointer which accepts a [WfContext]
pub fn new<F, Fut>(wf_func: F) -> Self
pub fn new<F, Fut, O>(f: F) -> Self
where
F: Fn(WfContext) -> Fut + Send + Sync + 'static,
Fut: Future<Output = WorkflowResult<()>> + Send + 'static,
Fut: Future<Output = Result<WfExitValue<O>, anyhow::Error>> + Send + 'static,
O: Serialize + Debug,
{
Self {
wf_func: Box::new(move |ctx: WfContext| wf_func(ctx).boxed()),
wf_func: Box::new(move |ctx: WfContext| {
(f)(ctx)
.map(|r| {
r.and_then(|r| {
Ok(match r {
WfExitValue::ContinueAsNew(b) => WfExitValue::ContinueAsNew(b),
WfExitValue::Cancelled => WfExitValue::Cancelled,
WfExitValue::Evicted => WfExitValue::Evicted,
WfExitValue::Normal(o) => WfExitValue::Normal(o.as_json_payload()?),
})
})
})
.boxed()
}),
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions sdk/src/workflow_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl WorkflowFunction {
args: Vec<Payload>,
outgoing_completions: UnboundedSender<WorkflowActivationCompletion>,
) -> (
impl Future<Output = WorkflowResult<()>>,
impl Future<Output = WorkflowResult<Payload>>,
UnboundedSender<WorkflowActivation>,
) {
let (cancel_tx, cancel_rx) = watch::channel(false);
Expand Down Expand Up @@ -93,7 +93,7 @@ enum SigChanOrBuffer {

pub struct WorkflowFuture {
/// Future produced by calling the workflow function
inner: BoxFuture<'static, WorkflowResult<()>>,
inner: BoxFuture<'static, WorkflowResult<Payload>>,
/// Commands produced inside user's wf code
incoming_commands: Receiver<RustWfCmd>,
/// Once blocked or the workflow has finished or errored out, the result is sent here
Expand Down Expand Up @@ -234,7 +234,7 @@ impl WorkflowFuture {
}

impl Future for WorkflowFuture {
type Output = WorkflowResult<()>;
type Output = WorkflowResult<Payload>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
'activations: loop {
Expand Down Expand Up @@ -443,10 +443,12 @@ impl Future for WorkflowFuture {
match res {
Ok(exit_val) => match exit_val {
// TODO: Generic values
WfExitValue::Normal(_) => {
WfExitValue::Normal(result) => {
activation_cmds.push(
workflow_command::Variant::CompleteWorkflowExecution(
CompleteWorkflowExecution { result: None },
CompleteWorkflowExecution {
result: Some(result),
},
),
);
}
Expand Down
2 changes: 1 addition & 1 deletion tests/integ_tests/workflow_tests/child_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ async fn abandoned_child_bug_repro() {
);
worker.register_wf(CHILD_WF_TYPE.to_string(), |mut ctx: WfContext| async move {
ctx.cancelled().await;
Ok(WfExitValue::Cancelled)
Ok(WfExitValue::<()>::Cancelled)
});

worker
Expand Down