Skip to content

Commit facaa2b

Browse files
authored
Factor out test worker (#286)
* Add more license files and cargo details * Rename `TestRustWorker` to `Worker` * Add doctest example
1 parent 12bd832 commit facaa2b

File tree

22 files changed

+436
-163
lines changed

22 files changed

+436
-163
lines changed

client/Cargo.toml

+7-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22
name = "temporal-client"
33
version = "0.1.0"
44
edition = "2021"
5+
authors = ["Spencer Judge <[email protected]>"]
6+
license-file = "LICENSE.txt"
7+
description = "Clients for interacting with Temporal Clusters"
8+
homepage = "https://temporal.io/"
9+
repository = "https://github.com/temporalio/sdk-core"
10+
keywords = ["temporal", "workflow"]
11+
categories = ["development-tools"]
512

613
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
714
[features]
@@ -29,4 +36,3 @@ uuid = { version = "0.8.2", features = ["v4"] }
2936
[dependencies.temporal-sdk-core-protos]
3037
path = "../sdk-core-protos"
3138
version = "0.1"
32-

client/LICENSE.txt

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
Temporal Core SDK
2+
3+
The MIT License
4+
5+
Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved
6+
7+
Permission is hereby granted, free of charge, to any person obtaining a copy
8+
of this software and associated documentation files (the "Software"), to deal
9+
in the Software without restriction, including without limitation the rights
10+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
copies of the Software, and to permit persons to whom the Software is
12+
furnished to do so, subject to the following conditions:
13+
14+
The above copyright notice and this permission notice shall be included in all
15+
copies or substantial portions of the Software.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
SOFTWARE.

client/src/lib.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,8 @@ impl ServerGateway {
509509
}
510510
}
511511

512-
/// This trait provides higher-level friendlier interaction with the server
512+
/// This trait provides higher-level friendlier interaction with the server.
513+
/// See the [WorkflowService] trait for a lower-level client.
513514
#[cfg_attr(any(feature = "mocks", test), mockall::automock)]
514515
#[async_trait::async_trait]
515516
pub trait ServerGatewayApis {

core-api/Cargo.toml

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@
22
name = "temporal-sdk-core-api"
33
version = "0.1.0"
44
edition = "2021"
5+
authors = ["Spencer Judge <[email protected]>"]
6+
license-file = "LICENSE.txt"
7+
description = "Interface definitions for the Temporal Core SDK"
8+
homepage = "https://temporal.io/"
9+
repository = "https://github.com/temporalio/sdk-core"
10+
keywords = ["temporal", "workflow"]
11+
categories = ["development-tools"]
512

613
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
714

core-api/LICENSE.txt

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
Temporal Core SDK
2+
3+
The MIT License
4+
5+
Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved
6+
7+
Permission is hereby granted, free of charge, to any person obtaining a copy
8+
of this software and associated documentation files (the "Software"), to deal
9+
in the Software without restriction, including without limitation the rights
10+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
copies of the Software, and to permit persons to whom the Software is
12+
furnished to do so, subject to the following conditions:
13+
14+
The above copyright notice and this permission notice shall be included in all
15+
copies or substantial portions of the Software.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
SOFTWARE.

core-api/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ pub trait Worker: Send + Sync {
9797
/// Return this worker's config
9898
fn get_config(&self) -> &WorkerConfig;
9999

100+
/// TODO: Will be replaced/fixed/whatever by shutdown refactoring
101+
fn initiate_shutdown(&self);
102+
100103
/// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts
101104
/// down this worker. [Worker::poll_workflow_activation] should be called until it
102105
/// returns [PollWfError::ShutDown] to ensure that any workflows which are still undergoing

core/LICENSE.txt

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
Temporal Core SDK
2+
3+
The MIT License
4+
5+
Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved
6+
7+
Permission is hereby granted, free of charge, to any person obtaining a copy
8+
of this software and associated documentation files (the "Software"), to deal
9+
in the Software without restriction, including without limitation the rights
10+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
copies of the Software, and to permit persons to whom the Software is
12+
furnished to do so, subject to the following conditions:
13+
14+
The above copyright notice and this permission notice shall be included in all
15+
copies or substantial portions of the Software.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
SOFTWARE.

core/benches/workflow_replay.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use criterion::{criterion_group, criterion_main, Criterion};
22
use futures::StreamExt;
33
use std::time::Duration;
4-
use temporal_sdk::{TestRustWorker, WfContext, WorkflowFunction};
4+
use temporal_sdk::{WfContext, Worker, WorkflowFunction};
55
use temporal_sdk_core::{telemetry_init, TelemetryOptionsBuilder};
66
use temporal_sdk_core_protos::DEFAULT_WORKFLOW_TYPE;
77
use temporal_sdk_core_test_utils::{canned_histories, init_core_replay_preloaded};
@@ -29,10 +29,9 @@ pub fn criterion_benchmark(c: &mut Criterion) {
2929
tokio_runtime.block_on(async {
3030
let func = timers_wf(num_timers);
3131
let (worker, _) = init_core_replay_preloaded("replay_bench", &hist);
32-
let mut worker = TestRustWorker::new(worker, "replay_bench".to_string(), None);
32+
let mut worker = Worker::new(worker, "replay_bench".to_string());
3333
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
34-
worker.incr_expected_run_count(1);
35-
worker.run_until_done().await.unwrap();
34+
worker.run().await.unwrap();
3635
})
3736
})
3837
});
@@ -46,10 +45,9 @@ pub fn criterion_benchmark(c: &mut Criterion) {
4645
tokio_runtime.block_on(async {
4746
let func = big_signals_wf(num_tasks);
4847
let (worker, _) = init_core_replay_preloaded("large_hist_bench", &hist);
49-
let mut worker = TestRustWorker::new(worker, "large_hist_bench".to_string(), None);
48+
let mut worker = Worker::new(worker, "large_hist_bench".to_string());
5049
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
51-
worker.incr_expected_run_count(1);
52-
worker.run_until_done().await.unwrap();
50+
worker.run().await.unwrap();
5351
})
5452
})
5553
});

core/src/core_tests/child_workflows.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ use crate::{
77
};
88
use std::sync::Arc;
99
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
10-
use temporal_sdk::{
11-
ChildWorkflowOptions, Signal, TestRustWorker, WfContext, WorkflowFunction, WorkflowResult,
12-
};
10+
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowFunction, WorkflowResult};
1311
use temporal_sdk_core_protos::coresdk::child_workflow::{
1412
child_workflow_result, ChildWorkflowCancellationType,
1513
};
14+
use temporal_sdk_core_test_utils::TestWorker;
1615
use tokio::join;
1716

1817
const SIGNAME: &str = "SIGNAME";
@@ -29,7 +28,7 @@ async fn signal_child_workflow(#[case] serial: bool) {
2928
let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock);
3029
let mock = build_mock_pollers(mh);
3130
let core = mock_worker(mock);
32-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
31+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
3332

3433
let wf = move |ctx: WfContext| async move {
3534
let child = ctx.child_workflow(ChildWorkflowOptions {

core/src/core_tests/determinism.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use std::{
1212
time::Duration,
1313
};
1414
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
15-
use temporal_sdk::{TestRustWorker, WfContext, WorkflowResult};
15+
use temporal_sdk::{WfContext, WorkflowResult};
1616
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause;
17+
use temporal_sdk_core_test_utils::TestWorker;
1718

1819
static DID_FAIL: AtomicBool = AtomicBool::new(false);
1920
pub async fn timer_wf_fails_once(ctx: WfContext) -> WorkflowResult<()> {
@@ -43,7 +44,7 @@ async fn test_wf_task_rejected_properly() {
4344
Box::new(|_, cause, _| matches!(cause, WorkflowTaskFailedCause::Unspecified));
4445
let mock = build_mock_pollers(mh);
4546
let core = mock_worker(mock);
46-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
47+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
4748

4849
worker.register_wf(wf_type.to_owned(), timer_wf_fails_once);
4950
worker
@@ -87,7 +88,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache:
8788
});
8889
}
8990
let core = mock_worker(mock);
90-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
91+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
9192

9293
let started_count: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
9394
worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move {

core/src/core_tests/local_activities.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ use std::{
1212
time::Duration,
1313
};
1414
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
15-
use temporal_sdk::{LocalActivityOptions, TestRustWorker, WfContext, WorkflowResult};
15+
use temporal_sdk::{LocalActivityOptions, WfContext, WorkflowResult};
1616
use temporal_sdk_core_protos::{
1717
coresdk::{common::RetryPolicy, AsJsonPayloadExt},
1818
temporal::api::{enums::v1::EventType, failure::v1::Failure},
1919
};
20+
use temporal_sdk_core_test_utils::TestWorker;
2021
use tokio::sync::Barrier;
2122

2223
async fn echo(e: String) -> anyhow::Result<String> {
@@ -55,7 +56,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
5556
mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1);
5657
}
5758
let core = mock_worker(mock);
58-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
59+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
5960

6061
worker.register_wf(
6162
DEFAULT_WORKFLOW_TYPE.to_owned(),
@@ -119,7 +120,7 @@ async fn local_act_many_concurrent() {
119120
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock);
120121
let mock = build_mock_pollers(mh);
121122
let core = mock_worker(mock);
122-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
123+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
123124

124125
worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf);
125126
worker.register_activity("echo", |str: String| async move { Ok(str) });
@@ -168,7 +169,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
168169
let mut mock = build_mock_pollers(mh);
169170
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
170171
let core = Arc::new(mock_worker(mock));
171-
let mut worker = TestRustWorker::new(core.clone(), TEST_Q.to_string(), None);
172+
let mut worker = TestWorker::new(core.clone(), TEST_Q.to_string());
172173
let shutdown_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2)));
173174

174175
worker.register_wf(
@@ -226,7 +227,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
226227
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock);
227228
let mock = build_mock_pollers(mh);
228229
let core = mock_worker(mock);
229-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
230+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
230231

231232
worker.register_wf(
232233
DEFAULT_WORKFLOW_TYPE.to_owned(),
@@ -310,7 +311,7 @@ async fn local_act_retry_long_backoff_uses_timer() {
310311
let mut mock = build_mock_pollers(mh);
311312
mock.worker_cfg(|w| w.max_cached_workflows = 1);
312313
let core = mock_worker(mock);
313-
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
314+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
314315

315316
worker.register_wf(
316317
DEFAULT_WORKFLOW_TYPE.to_owned(),

core/src/telemetry/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ pub fn fetch_global_buffered_logs() -> Vec<CoreLog> {
253253
pub(crate) fn test_telem_console() {
254254
telemetry_init(&TelemetryOptions {
255255
otel_collector_url: None,
256-
tracing_filter: "temporal_sdk_core=DEBUG".to_string(),
256+
tracing_filter: "temporal_sdk_core=DEBUG,temporal_sdk=DEBUG".to_string(),
257257
log_forwarding_level: LevelFilter::Off,
258258
prometheus_export_bind_address: None,
259259
totally_disable: false,

core/src/worker/mod.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,17 @@ impl WorkerTrait for Worker {
162162
&self.config
163163
}
164164

165+
/// Begins the shutdown process, tells pollers they should stop. Is idempotent.
166+
// TODO: will be in trait after Roey's shutdown refactor
167+
fn initiate_shutdown(&self) {
168+
self.shutdown_token.cancel();
169+
// First, we want to stop polling of both activity and workflow tasks
170+
if let Some(atm) = self.at_task_mgr.as_ref() {
171+
atm.notify_shutdown();
172+
}
173+
self.wf_task_source.stop_pollers();
174+
}
175+
165176
async fn shutdown(&self) {
166177
self.shutdown().await
167178
}
@@ -291,20 +302,11 @@ impl Worker {
291302
}
292303
}
293304

294-
/// Begins the shutdown process, tells pollers they should stop. Is idempotent.
295-
pub(crate) fn initiate_shutdown(&self) {
296-
self.shutdown_token.cancel();
297-
// First, we want to stop polling of both activity and workflow tasks
298-
if let Some(atm) = self.at_task_mgr.as_ref() {
299-
atm.notify_shutdown();
300-
}
301-
self.wf_task_source.stop_pollers();
302-
}
303-
304305
/// Will shutdown the worker. Does not resolve until all outstanding workflow tasks have been
305306
/// completed
306307
pub(crate) async fn shutdown(&self) {
307308
self.initiate_shutdown();
309+
info!("Initiated shutdown");
308310
// Next we need to wait for all local activities to finish so no more workflow task
309311
// heartbeats will be generated
310312
self.local_act_mgr.shutdown_and_wait_all_finished().await;

sdk/Cargo.toml

+10
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,26 @@
22
name = "temporal-sdk"
33
version = "0.1.0-alpha.1"
44
edition = "2021"
5+
authors = ["Spencer Judge <[email protected]>"]
6+
license-file = "LICENSE.txt"
7+
description = "Temporal Rust SDK"
8+
homepage = "https://temporal.io/"
9+
repository = "https://github.com/temporalio/sdk-core"
10+
keywords = ["temporal", "workflow"]
11+
categories = ["development-tools"]
512

613
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
714

815
[dependencies]
916
anyhow = "1.0"
17+
base64 = "0.13"
1018
crossbeam = "0.8"
1119
derive_more = "0.99"
1220
futures = "0.3"
21+
once_cell = "1.10"
1322
parking_lot = { version = "0.12", features = ["send_guard"] }
1423
prost-types = "0.9"
24+
sha2 = "0.10"
1525
serde = "1.0"
1626
tokio = { version = "1.1", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs"] }
1727
tokio-util = { version = "0.7" }

sdk/LICENSE.txt

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
Temporal Core SDK
2+
3+
The MIT License
4+
5+
Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved
6+
7+
Permission is hereby granted, free of charge, to any person obtaining a copy
8+
of this software and associated documentation files (the "Software"), to deal
9+
in the Software without restriction, including without limitation the rights
10+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
copies of the Software, and to permit persons to whom the Software is
12+
furnished to do so, subject to the following conditions:
13+
14+
The above copyright notice and this permission notice shall be included in all
15+
copies or substantial portions of the Software.
16+
17+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
SOFTWARE.

sdk/src/interceptors.rs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
//! User-definable interceptors are defined in this module
2+
3+
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
4+
5+
/// Implementors can intercept certain actions that happen within the Worker.
6+
///
7+
/// Advanced usage only.
8+
pub trait WorkerInterceptor {
9+
/// Called every time a workflow activation completes
10+
fn on_workflow_activation_completion(&self, completion: &WorkflowActivationCompletion);
11+
}

0 commit comments

Comments
 (0)