Skip to content

Commit 37d727d

Browse files
committed
Updated Activity/Workflow Function bounds to allow a more generic user function
1 parent 30613cf commit 37d727d

File tree

11 files changed

+278
-174
lines changed

11 files changed

+278
-174
lines changed

core/src/core_tests/local_activities.rs

+38-44
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use std::{
2121
};
2222
use temporal_client::WorkflowOptions;
2323
use temporal_sdk::{
24-
ActContext, ActivityCancelledError, LocalActivityOptions, WfContext, WorkflowFunction,
25-
WorkflowResult,
24+
ActContext, ActExitValue, ActivityCancelledError, ActivityFunction, LocalActivityOptions,
25+
WfContext, WorkflowFunction, WorkflowResult,
2626
};
2727
use temporal_sdk_core_api::{
2828
errors::{PollActivityError, PollWfError},
@@ -101,7 +101,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
101101
Ok(().into())
102102
},
103103
);
104-
worker.register_activity(DEFAULT_ACTIVITY_TYPE, echo);
104+
worker.register_activity(DEFAULT_ACTIVITY_TYPE, ActivityFunction::from(echo));
105105
worker
106106
.submit_wf(
107107
wf_id.to_owned(),
@@ -151,7 +151,7 @@ async fn local_act_many_concurrent() {
151151
let mut worker = mock_sdk(mh);
152152

153153
worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf);
154-
worker.register_activity("echo", echo);
154+
worker.register_activity("echo", ActivityFunction::from(echo));
155155
worker
156156
.submit_wf(
157157
wf_id.to_owned(),
@@ -207,14 +207,17 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
207207
Ok(().into())
208208
},
209209
);
210-
worker.register_activity("echo", move |_ctx: ActContext, str: String| async move {
211-
if shutdown_middle {
212-
shutdown_barr.wait().await;
213-
}
214-
// Take slightly more than two workflow tasks
215-
tokio::time::sleep(wft_timeout.mul_f32(2.2)).await;
216-
Ok(str)
217-
});
210+
worker.register_activity(
211+
"echo",
212+
ActivityFunction::from(move |_ctx: ActContext, str: String| async move {
213+
if shutdown_middle {
214+
shutdown_barr.wait().await;
215+
}
216+
// Take slightly more than two workflow tasks
217+
tokio::time::sleep(wft_timeout.mul_f32(2.2)).await;
218+
Ok(str)
219+
}),
220+
);
218221
worker
219222
.submit_wf(
220223
wf_id.to_owned(),
@@ -276,10 +279,10 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
276279
},
277280
);
278281
let attempts: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
279-
worker.register_activity("echo", move |_ctx: ActContext, _: String| async move {
282+
worker.register_activity("echo", move |_ctx: ActContext| async move {
280283
// Succeed on 3rd attempt (which is ==2 since fetch_add returns prev val)
281284
if 2 == attempts.fetch_add(1, Ordering::Relaxed) && eventually_pass {
282-
Ok(())
285+
Ok(().into())
283286
} else {
284287
Err(anyhow!("Oh no I failed!"))
285288
}
@@ -355,12 +358,9 @@ async fn local_act_retry_long_backoff_uses_timer() {
355358
Ok(().into())
356359
},
357360
);
358-
worker.register_activity(
359-
DEFAULT_ACTIVITY_TYPE,
360-
move |_ctx: ActContext, _: String| async move {
361-
Result::<(), _>::Err(anyhow!("Oh no I failed!"))
362-
},
363-
);
361+
worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |_ctx: ActContext| async move {
362+
Result::<ActExitValue<()>, _>::Err(anyhow!("Oh no I failed!"))
363+
});
364364
worker
365365
.submit_wf(
366366
wf_id.to_owned(),
@@ -398,7 +398,7 @@ async fn local_act_null_result() {
398398
Ok(().into())
399399
},
400400
);
401-
worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) });
401+
worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) });
402402
worker
403403
.submit_wf(
404404
wf_id.to_owned(),
@@ -442,7 +442,7 @@ async fn local_act_command_immediately_follows_la_marker() {
442442
Ok(().into())
443443
},
444444
);
445-
worker.register_activity("nullres", |_ctx: ActContext, _: String| async { Ok(()) });
445+
worker.register_activity("nullres", |_ctx: ActContext| async { Ok(().into()) });
446446
worker
447447
.submit_wf(
448448
wf_id.to_owned(),
@@ -778,10 +778,7 @@ async fn test_schedule_to_start_timeout() {
778778
Ok(().into())
779779
},
780780
);
781-
worker.register_activity(
782-
"echo",
783-
move |_ctx: ActContext, _: String| async move { Ok(()) },
784-
);
781+
worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) });
785782
worker
786783
.submit_wf(
787784
wf_id.to_owned(),
@@ -868,10 +865,7 @@ async fn test_schedule_to_start_timeout_not_based_on_original_time(
868865
Ok(().into())
869866
},
870867
);
871-
worker.register_activity(
872-
"echo",
873-
move |_ctx: ActContext, _: String| async move { Ok(()) },
874-
);
868+
worker.register_activity("echo", move |_ctx: ActContext| async move { Ok(().into()) });
875869
worker
876870
.submit_wf(
877871
wf_id.to_owned(),
@@ -917,16 +911,13 @@ async fn wft_failure_cancels_running_las() {
917911
Ok(().into())
918912
},
919913
);
920-
worker.register_activity(
921-
DEFAULT_ACTIVITY_TYPE,
922-
move |ctx: ActContext, _: String| async move {
923-
let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await;
924-
if res.is_err() {
925-
panic!("Activity must be cancelled!!!!");
926-
}
927-
Result::<(), _>::Err(ActivityCancelledError::default().into())
928-
},
929-
);
914+
worker.register_activity(DEFAULT_ACTIVITY_TYPE, move |ctx: ActContext| async move {
915+
let res = tokio::time::timeout(Duration::from_millis(500), ctx.cancelled()).await;
916+
if res.is_err() {
917+
panic!("Activity must be cancelled!!!!");
918+
}
919+
Result::<ActExitValue<()>, _>::Err(ActivityCancelledError::default().into())
920+
});
930921
worker
931922
.submit_wf(
932923
wf_id.to_owned(),
@@ -980,7 +971,7 @@ async fn resolved_las_not_recorded_if_wft_fails_many_times() {
980971
);
981972
worker.register_activity(
982973
"echo",
983-
move |_: ActContext, _: String| async move { Ok(()) },
974+
ActivityFunction::from(move |_: ActContext, _: String| async move { Ok(()) }),
984975
);
985976
worker
986977
.submit_wf(
@@ -1039,9 +1030,12 @@ async fn local_act_records_nonfirst_attempts_ok() {
10391030
Ok(().into())
10401031
},
10411032
);
1042-
worker.register_activity("echo", move |_ctx: ActContext, _: String| async move {
1043-
Result::<(), _>::Err(anyhow!("I fail"))
1044-
});
1033+
worker.register_activity(
1034+
"echo",
1035+
ActivityFunction::from(move |_ctx: ActContext, _: String| async move {
1036+
Result::<(), _>::Err(anyhow!("I fail"))
1037+
}),
1038+
);
10451039
worker
10461040
.submit_wf(
10471041
wf_id.to_owned(),

sdk/src/activity_context.rs

+28-33
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ impl ActContext {
6060
task_queue: String,
6161
task_token: Vec<u8>,
6262
task: activity_task::Start,
63-
) -> (Self, Payload) {
63+
) -> Self {
6464
let activity_task::Start {
6565
workflow_namespace,
6666
workflow_type,
6767
workflow_execution,
6868
activity_id,
6969
activity_type,
7070
header_fields,
71-
mut input,
71+
input,
7272
heartbeat_details,
7373
scheduled_time,
7474
current_attempt_scheduled_time,
@@ -86,37 +86,32 @@ impl ActContext {
8686
start_to_close_timeout.as_ref(),
8787
schedule_to_close_timeout.as_ref(),
8888
);
89-
let first_arg = input.pop().unwrap_or_default();
9089

91-
(
92-
ActContext {
93-
worker,
94-
app_data,
95-
cancellation_token,
96-
input,
97-
heartbeat_details,
98-
header_fields,
99-
info: ActivityInfo {
100-
task_token,
101-
task_queue,
102-
workflow_type,
103-
workflow_namespace,
104-
workflow_execution,
105-
activity_id,
106-
activity_type,
107-
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
108-
scheduled_time: scheduled_time.try_into_or_none(),
109-
started_time: started_time.try_into_or_none(),
110-
deadline,
111-
attempt,
112-
current_attempt_scheduled_time: current_attempt_scheduled_time
113-
.try_into_or_none(),
114-
retry_policy,
115-
is_local,
116-
},
90+
ActContext {
91+
worker,
92+
app_data,
93+
cancellation_token,
94+
input,
95+
heartbeat_details,
96+
header_fields,
97+
info: ActivityInfo {
98+
task_token,
99+
task_queue,
100+
workflow_type,
101+
workflow_namespace,
102+
workflow_execution,
103+
activity_id,
104+
activity_type,
105+
heartbeat_timeout: heartbeat_timeout.try_into_or_none(),
106+
scheduled_time: scheduled_time.try_into_or_none(),
107+
started_time: started_time.try_into_or_none(),
108+
deadline,
109+
attempt,
110+
current_attempt_scheduled_time: current_attempt_scheduled_time.try_into_or_none(),
111+
retry_policy,
112+
is_local,
117113
},
118-
first_arg,
119-
)
114+
}
120115
}
121116

122117
/// Returns a future the completes if and when the activity this was called inside has been
@@ -133,8 +128,8 @@ impl ActContext {
133128
/// Retrieve extra parameters to the Activity. The first input is always popped and passed to
134129
/// the Activity function for the currently executing activity. However, if more parameters are
135130
/// passed, perhaps from another language's SDK, explicit access is available from extra_inputs
136-
pub fn extra_inputs(&mut self) -> &mut [Payload] {
137-
&mut self.input
131+
pub fn get_args(&self) -> &[Payload] {
132+
&self.input
138133
}
139134

140135
/// Extract heartbeat details from last failed attempt. This is used in combination with retry policy.

0 commit comments

Comments
 (0)