Skip to content

Factor out test worker #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
name = "temporal-client"
version = "0.1.0"
edition = "2021"
authors = ["Spencer Judge <[email protected]>"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't really matter, but we've been setting this to Temporal Technologies Inc <[email protected]> in Python to not attach specific names.

license-file = "LICENSE.txt"
description = "Clients for interacting with Temporal Clusters"
homepage = "https://temporal.io/"
repository = "https://github.com/temporalio/sdk-core"
keywords = ["temporal", "workflow"]
categories = ["development-tools"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
Expand Down Expand Up @@ -29,4 +36,3 @@ uuid = { version = "0.8.2", features = ["v4"] }
[dependencies.temporal-sdk-core-protos]
path = "../sdk-core-protos"
version = "0.1"

23 changes: 23 additions & 0 deletions client/LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Temporal Core SDK

The MIT License

Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
3 changes: 2 additions & 1 deletion client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ impl ServerGateway {
}
}

/// This trait provides higher-level friendlier interaction with the server
/// This trait provides higher-level friendlier interaction with the server.
/// See the [WorkflowService] trait for a lower-level client.
#[cfg_attr(any(feature = "mocks", test), mockall::automock)]
#[async_trait::async_trait]
pub trait ServerGatewayApis {
Expand Down
7 changes: 7 additions & 0 deletions core-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@
name = "temporal-sdk-core-api"
version = "0.1.0"
edition = "2021"
authors = ["Spencer Judge <[email protected]>"]
license-file = "LICENSE.txt"
description = "Interface definitions for the Temporal Core SDK"
homepage = "https://temporal.io/"
repository = "https://github.com/temporalio/sdk-core"
keywords = ["temporal", "workflow"]
categories = ["development-tools"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
23 changes: 23 additions & 0 deletions core-api/LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Temporal Core SDK

The MIT License

Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
3 changes: 3 additions & 0 deletions core-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub trait Worker: Send + Sync {
/// Return this worker's config
fn get_config(&self) -> &WorkerConfig;

/// TODO: Will be replaced/fixed/whatever by shutdown refactoring
fn initiate_shutdown(&self);

/// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts
/// down this worker. [Worker::poll_workflow_activation] should be called until it
/// returns [PollWfError::ShutDown] to ensure that any workflows which are still undergoing
Expand Down
23 changes: 23 additions & 0 deletions core/LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Temporal Core SDK

The MIT License

Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
12 changes: 5 additions & 7 deletions core/benches/workflow_replay.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use criterion::{criterion_group, criterion_main, Criterion};
use futures::StreamExt;
use std::time::Duration;
use temporal_sdk::{TestRustWorker, WfContext, WorkflowFunction};
use temporal_sdk::{WfContext, Worker, WorkflowFunction};
use temporal_sdk_core::{telemetry_init, TelemetryOptionsBuilder};
use temporal_sdk_core_protos::DEFAULT_WORKFLOW_TYPE;
use temporal_sdk_core_test_utils::{canned_histories, init_core_replay_preloaded};
Expand Down Expand Up @@ -29,10 +29,9 @@ pub fn criterion_benchmark(c: &mut Criterion) {
tokio_runtime.block_on(async {
let func = timers_wf(num_timers);
let (worker, _) = init_core_replay_preloaded("replay_bench", &hist);
let mut worker = TestRustWorker::new(worker, "replay_bench".to_string(), None);
let mut worker = Worker::new(worker, "replay_bench".to_string());
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
worker.incr_expected_run_count(1);
worker.run_until_done().await.unwrap();
worker.run().await.unwrap();
})
})
});
Expand All @@ -46,10 +45,9 @@ pub fn criterion_benchmark(c: &mut Criterion) {
tokio_runtime.block_on(async {
let func = big_signals_wf(num_tasks);
let (worker, _) = init_core_replay_preloaded("large_hist_bench", &hist);
let mut worker = TestRustWorker::new(worker, "large_hist_bench".to_string(), None);
let mut worker = Worker::new(worker, "large_hist_bench".to_string());
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
worker.incr_expected_run_count(1);
worker.run_until_done().await.unwrap();
worker.run().await.unwrap();
})
})
});
Expand Down
7 changes: 3 additions & 4 deletions core/src/core_tests/child_workflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,11 @@ use crate::{
};
use std::sync::Arc;
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
use temporal_sdk::{
ChildWorkflowOptions, Signal, TestRustWorker, WfContext, WorkflowFunction, WorkflowResult,
};
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowFunction, WorkflowResult};
use temporal_sdk_core_protos::coresdk::child_workflow::{
child_workflow_result, ChildWorkflowCancellationType,
};
use temporal_sdk_core_test_utils::TestWorker;
use tokio::join;

const SIGNAME: &str = "SIGNAME";
Expand All @@ -29,7 +28,7 @@ async fn signal_child_workflow(#[case] serial: bool) {
let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock);
let mock = build_mock_pollers(mh);
let core = mock_worker(mock);
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());

let wf = move |ctx: WfContext| async move {
let child = ctx.child_workflow(ChildWorkflowOptions {
Expand Down
7 changes: 4 additions & 3 deletions core/src/core_tests/determinism.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use std::{
time::Duration,
};
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
use temporal_sdk::{TestRustWorker, WfContext, WorkflowResult};
use temporal_sdk::{WfContext, WorkflowResult};
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause;
use temporal_sdk_core_test_utils::TestWorker;

static DID_FAIL: AtomicBool = AtomicBool::new(false);
pub async fn timer_wf_fails_once(ctx: WfContext) -> WorkflowResult<()> {
Expand Down Expand Up @@ -43,7 +44,7 @@ async fn test_wf_task_rejected_properly() {
Box::new(|_, cause, _| matches!(cause, WorkflowTaskFailedCause::Unspecified));
let mock = build_mock_pollers(mh);
let core = mock_worker(mock);
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());

worker.register_wf(wf_type.to_owned(), timer_wf_fails_once);
worker
Expand Down Expand Up @@ -87,7 +88,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache:
});
}
let core = mock_worker(mock);
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());

let started_count: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move {
Expand Down
13 changes: 7 additions & 6 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ use std::{
time::Duration,
};
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
use temporal_sdk::{LocalActivityOptions, TestRustWorker, WfContext, WorkflowResult};
use temporal_sdk::{LocalActivityOptions, WfContext, WorkflowResult};
use temporal_sdk_core_protos::{
coresdk::{common::RetryPolicy, AsJsonPayloadExt},
temporal::api::{enums::v1::EventType, failure::v1::Failure},
};
use temporal_sdk_core_test_utils::TestWorker;
use tokio::sync::Barrier;

async fn echo(e: String) -> anyhow::Result<String> {
Expand Down Expand Up @@ -55,7 +56,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1);
}
let core = mock_worker(mock);
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());

worker.register_wf(
DEFAULT_WORKFLOW_TYPE.to_owned(),
Expand Down Expand Up @@ -119,7 +120,7 @@ async fn local_act_many_concurrent() {
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock);
let mock = build_mock_pollers(mh);
let core = mock_worker(mock);
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());

worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf);
worker.register_activity("echo", |str: String| async move { Ok(str) });
Expand Down Expand Up @@ -168,7 +169,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = Arc::new(mock_worker(mock));
let mut worker = TestRustWorker::new(core.clone(), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(core.clone(), TEST_Q.to_string());
let shutdown_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2)));

worker.register_wf(
Expand Down Expand Up @@ -226,7 +227,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock);
let mock = build_mock_pollers(mh);
let core = mock_worker(mock);
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());

worker.register_wf(
DEFAULT_WORKFLOW_TYPE.to_owned(),
Expand Down Expand Up @@ -310,7 +311,7 @@ async fn local_act_retry_long_backoff_uses_timer() {
let mut mock = build_mock_pollers(mh);
mock.worker_cfg(|w| w.max_cached_workflows = 1);
let core = mock_worker(mock);
let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None);
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());

worker.register_wf(
DEFAULT_WORKFLOW_TYPE.to_owned(),
Expand Down
2 changes: 1 addition & 1 deletion core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn fetch_global_buffered_logs() -> Vec<CoreLog> {
pub(crate) fn test_telem_console() {
telemetry_init(&TelemetryOptions {
otel_collector_url: None,
tracing_filter: "temporal_sdk_core=DEBUG".to_string(),
tracing_filter: "temporal_sdk_core=DEBUG,temporal_sdk=DEBUG".to_string(),
log_forwarding_level: LevelFilter::Off,
prometheus_export_bind_address: None,
totally_disable: false,
Expand Down
22 changes: 12 additions & 10 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ impl WorkerTrait for Worker {
&self.config
}

/// Begins the shutdown process, tells pollers they should stop. Is idempotent.
// TODO: will be in trait after Roey's shutdown refactor
fn initiate_shutdown(&self) {
self.shutdown_token.cancel();
// First, we want to stop polling of both activity and workflow tasks
if let Some(atm) = self.at_task_mgr.as_ref() {
atm.notify_shutdown();
}
self.wf_task_source.stop_pollers();
}

async fn shutdown(&self) {
self.shutdown().await
}
Expand Down Expand Up @@ -291,20 +302,11 @@ impl Worker {
}
}

/// Begins the shutdown process, tells pollers they should stop. Is idempotent.
pub(crate) fn initiate_shutdown(&self) {
self.shutdown_token.cancel();
// First, we want to stop polling of both activity and workflow tasks
if let Some(atm) = self.at_task_mgr.as_ref() {
atm.notify_shutdown();
}
self.wf_task_source.stop_pollers();
}

/// Will shutdown the worker. Does not resolve until all outstanding workflow tasks have been
/// completed
pub(crate) async fn shutdown(&self) {
self.initiate_shutdown();
info!("Initiated shutdown");
// Next we need to wait for all local activities to finish so no more workflow task
// heartbeats will be generated
self.local_act_mgr.shutdown_and_wait_all_finished().await;
Expand Down
10 changes: 10 additions & 0 deletions sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,26 @@
name = "temporal-sdk"
version = "0.1.0-alpha.1"
edition = "2021"
authors = ["Spencer Judge <[email protected]>"]
license-file = "LICENSE.txt"
description = "Temporal Rust SDK"
homepage = "https://temporal.io/"
repository = "https://github.com/temporalio/sdk-core"
keywords = ["temporal", "workflow"]
categories = ["development-tools"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
anyhow = "1.0"
base64 = "0.13"
crossbeam = "0.8"
derive_more = "0.99"
futures = "0.3"
once_cell = "1.10"
parking_lot = { version = "0.12", features = ["send_guard"] }
prost-types = "0.9"
sha2 = "0.10"
serde = "1.0"
tokio = { version = "1.1", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs"] }
tokio-util = { version = "0.7" }
Expand Down
23 changes: 23 additions & 0 deletions sdk/LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Temporal Core SDK

The MIT License

Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
11 changes: 11 additions & 0 deletions sdk/src/interceptors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
//! User-definable interceptors are defined in this module

use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;

/// Implementors can intercept certain actions that happen within the Worker.
///
/// Advanced usage only.
pub trait WorkerInterceptor {
/// Called every time a workflow activation completes
fn on_workflow_activation_completion(&self, completion: &WorkflowActivationCompletion);
}
Loading