Skip to content

Commit 9a3e074

Browse files
committed
Rename TestRustWorker to Worker
1 parent a58670e commit 9a3e074

File tree

8 files changed

+47
-37
lines changed

8 files changed

+47
-37
lines changed

client/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,3 @@ uuid = { version = "0.8.2", features = ["v4"] }
3636
[dependencies.temporal-sdk-core-protos]
3737
path = "../sdk-core-protos"
3838
version = "0.1"
39-

core/benches/workflow_replay.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use criterion::{criterion_group, criterion_main, Criterion};
22
use futures::StreamExt;
33
use std::time::Duration;
4-
use temporal_sdk::{TestRustWorker, WfContext, WorkflowFunction};
4+
use temporal_sdk::{WfContext, Worker, WorkflowFunction};
55
use temporal_sdk_core::{telemetry_init, TelemetryOptionsBuilder};
66
use temporal_sdk_core_protos::DEFAULT_WORKFLOW_TYPE;
77
use temporal_sdk_core_test_utils::{canned_histories, init_core_replay_preloaded};
@@ -29,7 +29,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
2929
tokio_runtime.block_on(async {
3030
let func = timers_wf(num_timers);
3131
let (worker, _) = init_core_replay_preloaded("replay_bench", &hist);
32-
let mut worker = TestRustWorker::new(worker, "replay_bench".to_string(), None);
32+
let mut worker = Worker::new(worker, "replay_bench".to_string(), None);
3333
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
3434
worker.incr_expected_run_count(1);
3535
worker.run_until_done().await.unwrap();
@@ -46,7 +46,7 @@ pub fn criterion_benchmark(c: &mut Criterion) {
4646
tokio_runtime.block_on(async {
4747
let func = big_signals_wf(num_tasks);
4848
let (worker, _) = init_core_replay_preloaded("large_hist_bench", &hist);
49-
let mut worker = TestRustWorker::new(worker, "large_hist_bench".to_string(), None);
49+
let mut worker = Worker::new(worker, "large_hist_bench".to_string(), None);
5050
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
5151
worker.incr_expected_run_count(1);
5252
worker.run_until_done().await.unwrap();

core/src/core_tests/child_workflows.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
use std::sync::Arc;
99
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
1010
use temporal_sdk::{
11-
ChildWorkflowOptions, Signal, TestRustWorker, WfContext, WorkflowFunction, WorkflowResult,
11+
ChildWorkflowOptions, Signal, WfContext, Worker, WorkflowFunction, WorkflowResult,
1212
};
1313
use temporal_sdk_core_protos::coresdk::child_workflow::{
1414
child_workflow_result, ChildWorkflowCancellationType,
@@ -29,7 +29,7 @@ async fn signal_child_workflow(#[case] serial: bool) {
2929
let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock);
3030
let mock = build_mock_pollers(mh);
3131
let core = mock_worker(mock);
32-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
32+
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
3333

3434
let wf = move |ctx: WfContext| async move {
3535
let child = ctx.child_workflow(ChildWorkflowOptions {

core/src/core_tests/determinism.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
time::Duration,
1313
};
1414
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
15-
use temporal_sdk::{TestRustWorker, WfContext, WorkflowResult};
15+
use temporal_sdk::{WfContext, Worker, WorkflowResult};
1616
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause;
1717

1818
static DID_FAIL: AtomicBool = AtomicBool::new(false);
@@ -43,7 +43,7 @@ async fn test_wf_task_rejected_properly() {
4343
Box::new(|_, cause, _| matches!(cause, WorkflowTaskFailedCause::Unspecified));
4444
let mock = build_mock_pollers(mh);
4545
let core = mock_worker(mock);
46-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
46+
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
4747

4848
worker.register_wf(wf_type.to_owned(), timer_wf_fails_once);
4949
worker
@@ -87,7 +87,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache:
8787
});
8888
}
8989
let core = mock_worker(mock);
90-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
90+
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
9191

9292
let started_count: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
9393
worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move {

core/src/core_tests/local_activities.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
time::Duration,
1313
};
1414
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
15-
use temporal_sdk::{LocalActivityOptions, TestRustWorker, WfContext, WorkflowResult};
15+
use temporal_sdk::{LocalActivityOptions, WfContext, Worker, WorkflowResult};
1616
use temporal_sdk_core_protos::{
1717
coresdk::{common::RetryPolicy, AsJsonPayloadExt},
1818
temporal::api::{enums::v1::EventType, failure::v1::Failure},
@@ -55,7 +55,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
5555
mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1);
5656
}
5757
let core = mock_worker(mock);
58-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
58+
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
5959

6060
worker.register_wf(
6161
DEFAULT_WORKFLOW_TYPE.to_owned(),
@@ -119,7 +119,7 @@ async fn local_act_many_concurrent() {
119119
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock);
120120
let mock = build_mock_pollers(mh);
121121
let core = mock_worker(mock);
122-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
122+
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
123123

124124
worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf);
125125
worker.register_activity("echo", |str: String| async move { Ok(str) });
@@ -168,7 +168,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
168168
let mut mock = build_mock_pollers(mh);
169169
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
170170
let core = Arc::new(mock_worker(mock));
171-
let mut worker = TestRustWorker::new(core.clone(), TEST_Q.to_string(), None);
171+
let mut worker = Worker::new(core.clone(), TEST_Q.to_string(), None);
172172
let shutdown_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2)));
173173

174174
worker.register_wf(
@@ -226,7 +226,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
226226
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock);
227227
let mock = build_mock_pollers(mh);
228228
let core = mock_worker(mock);
229-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
229+
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
230230

231231
worker.register_wf(
232232
DEFAULT_WORKFLOW_TYPE.to_owned(),
@@ -310,7 +310,7 @@ async fn local_act_retry_long_backoff_uses_timer() {
310310
let mut mock = build_mock_pollers(mh);
311311
mock.worker_cfg(|w| w.max_cached_workflows = 1);
312312
let core = mock_worker(mock);
313-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
313+
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
314314

315315
worker.register_wf(
316316
DEFAULT_WORKFLOW_TYPE.to_owned(),

sdk/src/lib.rs

+18-10
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use std::{
3535
use temporal_client::{ServerGatewayApis, WorkflowOptions};
3636
use temporal_sdk_core_api::{
3737
errors::{PollActivityError, PollWfError},
38-
Worker,
38+
Worker as CoreWorker,
3939
};
4040
use temporal_sdk_core_protos::{
4141
coresdk::{
@@ -64,9 +64,10 @@ use tokio::{
6464
};
6565
use tokio_util::sync::CancellationToken;
6666

67-
/// A worker that can poll for and respond to workflow tasks by using [WorkflowFunction]s
68-
pub struct TestRustWorker {
69-
worker: Arc<dyn Worker>,
67+
/// A worker that can poll for and respond to workflow tasks by using [WorkflowFunction]s,
68+
/// and activity tasks by using [ActivityFunction]s
69+
pub struct Worker {
70+
worker: Arc<dyn CoreWorker>,
7071
task_queue: String,
7172
task_timeout: Option<Duration>,
7273
workflow_half: WorkflowHalf,
@@ -91,10 +92,10 @@ struct ActivityHalf {
9192
task_tokens_to_cancels: HashMap<TaskToken, CancellationToken>,
9293
}
9394

94-
impl TestRustWorker {
95+
impl Worker {
9596
/// Create a new rust worker
9697
pub fn new(
97-
worker: Arc<dyn Worker>,
98+
worker: Arc<dyn CoreWorker>,
9899
task_queue: String,
99100
task_timeout: Option<Duration>,
100101
) -> Self {
@@ -280,11 +281,18 @@ impl TestRustWorker {
280281
/// Turns this rust worker into a new worker with all the same workflows and activities
281282
/// registered, but with a new underlying core worker. Can be used to swap the worker for
282283
/// a replay worker, change task queues, etc.
283-
pub fn with_new_core_worker(&mut self, new_core_worker: Arc<dyn Worker>) {
284+
pub fn with_new_core_worker(&mut self, new_core_worker: Arc<dyn CoreWorker>) {
284285
self.worker = new_core_worker;
285286
}
286287

287-
fn split_apart(&mut self) -> (Arc<dyn Worker>, &str, &mut WorkflowHalf, &mut ActivityHalf) {
288+
fn split_apart(
289+
&mut self,
290+
) -> (
291+
Arc<dyn CoreWorker>,
292+
&str,
293+
&mut WorkflowHalf,
294+
&mut ActivityHalf,
295+
) {
288296
(
289297
self.worker.clone(),
290298
&self.task_queue,
@@ -297,7 +305,7 @@ impl TestRustWorker {
297305
impl WorkflowHalf {
298306
async fn workflow_activation_handler(
299307
&mut self,
300-
worker: &dyn Worker,
308+
worker: &dyn CoreWorker,
301309
task_queue: &str,
302310
shutdown_rx: &Receiver<bool>,
303311
completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
@@ -374,7 +382,7 @@ impl ActivityHalf {
374382
/// Spawns off a task to handle the provided activity task
375383
fn activity_task_handler(
376384
&mut self,
377-
worker: Arc<dyn Worker>,
385+
worker: Arc<dyn CoreWorker>,
378386
activity: ActivityTask,
379387
) -> Result<(), anyhow::Error> {
380388
match activity.variant {

test-utils/src/lib.rs

+13-10
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ use std::{
1212
time::Duration,
1313
};
1414
use temporal_client::WorkflowOptions;
15-
use temporal_sdk::TestRustWorker;
15+
use temporal_sdk::Worker;
1616
use temporal_sdk_core::{
1717
init_replay_worker, init_worker, replay::mock_gateway_from_history, telemetry_init,
1818
ServerGatewayOptions, ServerGatewayOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder,
1919
WorkerConfig, WorkerConfigBuilder,
2020
};
21-
use temporal_sdk_core_api::Worker;
21+
use temporal_sdk_core_api::Worker as CoreWorker;
2222
use temporal_sdk_core_protos::{
2323
coresdk::{
2424
workflow_commands::{
@@ -40,7 +40,7 @@ const PROM_ENABLE_ENV_VAR: &str = "TEMPORAL_INTEG_PROM_PORT";
4040

4141
/// Create a worker instance which will use the provided test name to base the task queue and wf id
4242
/// upon. Returns the instance and the task queue name (which is also the workflow id).
43-
pub async fn init_core_and_create_wf(test_name: &str) -> (Arc<dyn Worker>, String) {
43+
pub async fn init_core_and_create_wf(test_name: &str) -> (Arc<dyn CoreWorker>, String) {
4444
let mut starter = CoreWfStarter::new(test_name);
4545
let core = starter.get_worker().await;
4646
starter.start_wf().await;
@@ -49,7 +49,10 @@ pub async fn init_core_and_create_wf(test_name: &str) -> (Arc<dyn Worker>, Strin
4949

5050
/// Create a worker replay instance preloaded with a provided history. Returns the worker impl
5151
/// and the task queue name as in [init_core_and_create_wf].
52-
pub fn init_core_replay_preloaded(test_name: &str, history: &History) -> (Arc<dyn Worker>, String) {
52+
pub fn init_core_replay_preloaded(
53+
test_name: &str,
54+
history: &History,
55+
) -> (Arc<dyn CoreWorker>, String) {
5356
let worker_cfg = WorkerConfigBuilder::default()
5457
.namespace(NAMESPACE)
5558
.task_queue(test_name)
@@ -77,7 +80,7 @@ pub struct CoreWfStarter {
7780
telemetry_options: TelemetryOptions,
7881
worker_config: WorkerConfig,
7982
wft_timeout: Option<Duration>,
80-
initted_worker: Option<Arc<dyn Worker>>,
83+
initted_worker: Option<Arc<dyn CoreWorker>>,
8184
}
8285

8386
impl CoreWfStarter {
@@ -103,8 +106,8 @@ impl CoreWfStarter {
103106
}
104107
}
105108

106-
pub async fn worker(&mut self) -> TestRustWorker {
107-
TestRustWorker::new(
109+
pub async fn worker(&mut self) -> Worker {
110+
Worker::new(
108111
self.get_worker().await,
109112
self.worker_config.task_queue.clone(),
110113
self.wft_timeout,
@@ -115,7 +118,7 @@ impl CoreWfStarter {
115118
self.get_worker().await.shutdown().await;
116119
}
117120

118-
pub async fn get_worker(&mut self) -> Arc<dyn Worker> {
121+
pub async fn get_worker(&mut self) -> Arc<dyn CoreWorker> {
119122
if self.initted_worker.is_none() {
120123
telemetry_init(&self.telemetry_options).expect("Telemetry inits cleanly");
121124
let gateway = get_integ_server_options()
@@ -163,7 +166,7 @@ impl CoreWfStarter {
163166
wf_id: impl Into<String>,
164167
run_id: impl Into<String>,
165168
// TODO: Need not be passed in
166-
worker: &mut TestRustWorker,
169+
worker: &mut Worker,
167170
) -> Result<(), anyhow::Error> {
168171
// Fetch history and replay it
169172
let history = self
@@ -317,7 +320,7 @@ pub trait WorkerTestHelpers {
317320
#[async_trait::async_trait]
318321
impl<T> WorkerTestHelpers for T
319322
where
320-
T: Worker + ?Sized,
323+
T: CoreWorker + ?Sized,
321324
{
322325
async fn complete_execution(&self, run_id: &str) {
323326
self.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(

tests/integ_tests/workflow_tests/replay.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use assert_matches::assert_matches;
22
use std::time::Duration;
3-
use temporal_sdk::{TestRustWorker, WfContext, WorkflowFunction};
3+
use temporal_sdk::{WfContext, Worker, WorkflowFunction};
44
use temporal_sdk_core::{replay::mock_gateway_from_history, telemetry_init, ServerGatewayApis};
55
use temporal_sdk_core_api::errors::{PollActivityError, PollWfError};
66
use temporal_sdk_core_protos::{
@@ -133,7 +133,7 @@ async fn replay_using_wf_function() {
133133
let func = timers_wf(num_timers);
134134
let (worker, _) =
135135
init_core_replay_preloaded("replay_bench", &t.get_full_history_info().unwrap().into());
136-
let mut worker = TestRustWorker::new(worker, "replay_bench".to_string(), None);
136+
let mut worker = Worker::new(worker, "replay_bench".to_string(), None);
137137
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
138138
worker.incr_expected_run_count(1);
139139
worker.run_until_done().await.unwrap();

0 commit comments

Comments
 (0)