Skip to content

Commit d5661a5

Browse files
committed
Updated workflow function to return Payload instead of empty tuple
1 parent 81ece42 commit d5661a5

File tree

6 files changed

+49
-22
lines changed

6 files changed

+49
-22
lines changed

README.md

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ You can buld and test the project using cargo:
4242
Run integ tests with `cargo integ-test`. You will need to already be running the server:
4343
`docker-compose -f .buildkite/docker/docker-compose.yaml up`
4444

45+
Run load tests with `cargo test --features=save_wf_inputs --test heavy_tests`.
46+
4547
## Formatting
4648
To format all code run:
4749
`cargo fmt --all`

core/src/core_tests/local_activities.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ use std::{
2121
};
2222
use temporal_client::WorkflowOptions;
2323
use temporal_sdk::{
24-
ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowResult,
24+
ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowFunction,
25+
WorkflowResult,
2526
};
2627
use temporal_sdk_core_api::{
2728
errors::{PollActivityError, PollWfError},
@@ -964,17 +965,18 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
964965
mh.num_expected_completions = Some(0.into());
965966
let mut worker = mock_sdk_cfg(mh, |w| w.max_cached_workflows = 1);
966967

968+
#[allow(unreachable_code)]
967969
worker.register_wf(
968970
DEFAULT_WORKFLOW_TYPE.to_owned(),
969-
|ctx: WfContext| async move {
971+
WorkflowFunction::new::<_, _, ()>(|ctx: WfContext| async move {
970972
ctx.local_activity(LocalActivityOptions {
971973
activity_type: "echo".to_string(),
972974
input: "hi".as_json_payload().expect("serializes fine"),
973975
..Default::default()
974976
})
975977
.await;
976-
panic!("Oh nooooo")
977-
},
978+
panic!()
979+
}),
978980
);
979981
worker.register_activity(
980982
"echo",

core/src/worker/workflow/managed_run/managed_wf_test.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ pub struct ManagedWFFunc {
6161
activation_tx: UnboundedSender<WorkflowActivation>,
6262
completions_rx: UnboundedReceiver<WorkflowActivationCompletion>,
6363
completions_sync_tx: crossbeam::channel::Sender<WorkflowActivationCompletion>,
64-
future_handle: Option<JoinHandle<WorkflowResult<()>>>,
64+
future_handle: Option<JoinHandle<WorkflowResult<Payload>>>,
6565
was_shutdown: bool,
6666
}
6767

@@ -175,7 +175,7 @@ impl ManagedWFFunc {
175175
Ok(last_act)
176176
}
177177

178-
pub async fn shutdown(&mut self) -> WorkflowResult<()> {
178+
pub async fn shutdown(&mut self) -> WorkflowResult<Payload> {
179179
self.was_shutdown = true;
180180
// Send an eviction to ensure wf exits if it has not finished (ex: feeding partial hist)
181181
let _ = self.activation_tx.send(create_evict_activation(

sdk/src/lib.rs

+31-10
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ use crate::{
6565
use anyhow::{anyhow, bail, Context};
6666
use app_data::AppData;
6767
use futures::{future::BoxFuture, FutureExt, StreamExt, TryFutureExt, TryStreamExt};
68+
use serde::Serialize;
6869
use std::{
6970
any::{Any, TypeId},
7071
cell::RefCell,
@@ -147,7 +148,7 @@ struct WorkflowData {
147148
activation_chan: UnboundedSender<WorkflowActivation>,
148149
}
149150

150-
struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<()>, JoinError>>> {
151+
struct WorkflowFutureHandle<F: Future<Output = Result<WorkflowResult<Payload>, JoinError>>> {
151152
join_handle: F,
152153
run_id: String,
153154
}
@@ -193,10 +194,10 @@ impl Worker {
193194

194195
/// Register a Workflow function to invoke when the Worker is asked to run a workflow of
195196
/// `workflow_type`
196-
pub fn register_wf<F: Into<WorkflowFunction>>(
197+
pub fn register_wf(
197198
&mut self,
198199
workflow_type: impl Into<String>,
199-
wf_function: F,
200+
wf_function: impl Into<WorkflowFunction>,
200201
) {
201202
self.workflow_half
202203
.workflow_fns
@@ -383,7 +384,9 @@ impl WorkflowHalf {
383384
activation: WorkflowActivation,
384385
completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
385386
) -> Result<
386-
Option<WorkflowFutureHandle<impl Future<Output = Result<WorkflowResult<()>, JoinError>>>>,
387+
Option<
388+
WorkflowFutureHandle<impl Future<Output = Result<WorkflowResult<Payload>, JoinError>>>,
389+
>,
387390
anyhow::Error,
388391
> {
389392
let mut res = None;
@@ -694,17 +697,21 @@ struct CommandSubscribeChildWorkflowCompletion {
694697
unblocker: oneshot::Sender<UnblockEvent>,
695698
}
696699

697-
type WfFunc = dyn Fn(WfContext) -> BoxFuture<'static, WorkflowResult<()>> + Send + Sync + 'static;
700+
type WfFunc = dyn Fn(WfContext) -> BoxFuture<'static, Result<WfExitValue<Payload>, anyhow::Error>>
701+
+ Send
702+
+ Sync
703+
+ 'static;
698704

699705
/// The user's async function / workflow code
700706
pub struct WorkflowFunction {
701707
wf_func: Box<WfFunc>,
702708
}
703709

704-
impl<F, Fut> From<F> for WorkflowFunction
710+
impl<F, Fut, O> From<F> for WorkflowFunction
705711
where
706712
F: Fn(WfContext) -> Fut + Send + Sync + 'static,
707-
Fut: Future<Output = WorkflowResult<()>> + Send + 'static,
713+
Fut: Future<Output = Result<WfExitValue<O>, anyhow::Error>> + Send + 'static,
714+
O: Serialize + Debug,
708715
{
709716
fn from(wf_func: F) -> Self {
710717
Self::new(wf_func)
@@ -713,13 +720,27 @@ where
713720

714721
impl WorkflowFunction {
715722
/// Build a workflow function from a closure or function pointer which accepts a [WfContext]
716-
pub fn new<F, Fut>(wf_func: F) -> Self
723+
pub fn new<F, Fut, O>(f: F) -> Self
717724
where
718725
F: Fn(WfContext) -> Fut + Send + Sync + 'static,
719-
Fut: Future<Output = WorkflowResult<()>> + Send + 'static,
726+
Fut: Future<Output = Result<WfExitValue<O>, anyhow::Error>> + Send + 'static,
727+
O: Serialize + Debug,
720728
{
721729
Self {
722-
wf_func: Box::new(move |ctx: WfContext| wf_func(ctx).boxed()),
730+
wf_func: Box::new(move |ctx: WfContext| {
731+
(f)(ctx)
732+
.map(|r| {
733+
r.and_then(|r| {
734+
Ok(match r {
735+
WfExitValue::ContinueAsNew(b) => WfExitValue::ContinueAsNew(b),
736+
WfExitValue::Cancelled => WfExitValue::Cancelled,
737+
WfExitValue::Evicted => WfExitValue::Evicted,
738+
WfExitValue::Normal(o) => WfExitValue::Normal(o.as_json_payload()?),
739+
})
740+
})
741+
})
742+
.boxed()
743+
}),
723744
}
724745
}
725746
}

sdk/src/workflow_future.rs

+7-5
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ impl WorkflowFunction {
5050
args: Vec<Payload>,
5151
outgoing_completions: UnboundedSender<WorkflowActivationCompletion>,
5252
) -> (
53-
impl Future<Output = WorkflowResult<()>>,
53+
impl Future<Output = WorkflowResult<Payload>>,
5454
UnboundedSender<WorkflowActivation>,
5555
) {
5656
let (cancel_tx, cancel_rx) = watch::channel(false);
@@ -93,7 +93,7 @@ enum SigChanOrBuffer {
9393

9494
pub struct WorkflowFuture {
9595
/// Future produced by calling the workflow function
96-
inner: BoxFuture<'static, WorkflowResult<()>>,
96+
inner: BoxFuture<'static, WorkflowResult<Payload>>,
9797
/// Commands produced inside user's wf code
9898
incoming_commands: Receiver<RustWfCmd>,
9999
/// Once blocked or the workflow has finished or errored out, the result is sent here
@@ -234,7 +234,7 @@ impl WorkflowFuture {
234234
}
235235

236236
impl Future for WorkflowFuture {
237-
type Output = WorkflowResult<()>;
237+
type Output = WorkflowResult<Payload>;
238238

239239
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
240240
'activations: loop {
@@ -443,10 +443,12 @@ impl Future for WorkflowFuture {
443443
match res {
444444
Ok(exit_val) => match exit_val {
445445
// TODO: Generic values
446-
WfExitValue::Normal(_) => {
446+
WfExitValue::Normal(result) => {
447447
activation_cmds.push(
448448
workflow_command::Variant::CompleteWorkflowExecution(
449-
CompleteWorkflowExecution { result: None },
449+
CompleteWorkflowExecution {
450+
result: Some(result),
451+
},
450452
),
451453
);
452454
}

tests/integ_tests/workflow_tests/child_workflows.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ async fn abandoned_child_bug_repro() {
9191
);
9292
worker.register_wf(CHILD_WF_TYPE.to_string(), |mut ctx: WfContext| async move {
9393
ctx.cancelled().await;
94-
Ok(WfExitValue::Cancelled)
94+
Ok(WfExitValue::<()>::Cancelled)
9595
});
9696

9797
worker

0 commit comments

Comments
 (0)