From 49a37fc0c2217e79296d0be2774abe4f369b828f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 10 Mar 2022 11:32:52 -0800 Subject: [PATCH 1/5] Add more license files and cargo details --- client/Cargo.toml | 7 +++++++ client/LICENSE.txt | 23 +++++++++++++++++++++++ client/src/lib.rs | 3 ++- core-api/Cargo.toml | 7 +++++++ core-api/LICENSE.txt | 23 +++++++++++++++++++++++ core/LICENSE.txt | 23 +++++++++++++++++++++++ sdk/Cargo.toml | 7 +++++++ sdk/LICENSE.txt | 23 +++++++++++++++++++++++ 8 files changed, 115 insertions(+), 1 deletion(-) create mode 100644 client/LICENSE.txt create mode 100644 core-api/LICENSE.txt create mode 100644 core/LICENSE.txt create mode 100644 sdk/LICENSE.txt diff --git a/client/Cargo.toml b/client/Cargo.toml index a8ce03de3..8ff501688 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -2,6 +2,13 @@ name = "temporal-client" version = "0.1.0" edition = "2021" +authors = ["Spencer Judge "] +license-file = "LICENSE.txt" +description = "Clients for interacting with Temporal Clusters" +homepage = "https://temporal.io/" +repository = "https://github.com/temporalio/sdk-core" +keywords = ["temporal", "workflow"] +categories = ["development-tools"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] diff --git a/client/LICENSE.txt b/client/LICENSE.txt new file mode 100644 index 000000000..a53527ecf --- /dev/null +++ b/client/LICENSE.txt @@ -0,0 +1,23 @@ +Temporal Core SDK + +The MIT License + +Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/client/src/lib.rs b/client/src/lib.rs index 88d7a5128..01cad30be 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -509,7 +509,8 @@ impl ServerGateway { } } -/// This trait provides higher-level friendlier interaction with the server +/// This trait provides higher-level friendlier interaction with the server. +/// See the [WorkflowService] trait for a lower-level client. #[cfg_attr(any(feature = "mocks", test), mockall::automock)] #[async_trait::async_trait] pub trait ServerGatewayApis { diff --git a/core-api/Cargo.toml b/core-api/Cargo.toml index 84160ff5e..e1691fdea 100644 --- a/core-api/Cargo.toml +++ b/core-api/Cargo.toml @@ -2,6 +2,13 @@ name = "temporal-sdk-core-api" version = "0.1.0" edition = "2021" +authors = ["Spencer Judge "] +license-file = "LICENSE.txt" +description = "Interface definitions for the Temporal Core SDK" +homepage = "https://temporal.io/" +repository = "https://github.com/temporalio/sdk-core" +keywords = ["temporal", "workflow"] +categories = ["development-tools"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/core-api/LICENSE.txt b/core-api/LICENSE.txt new file mode 100644 index 000000000..a53527ecf --- /dev/null +++ b/core-api/LICENSE.txt @@ -0,0 +1,23 @@ +Temporal Core SDK + +The MIT License + +Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/core/LICENSE.txt b/core/LICENSE.txt new file mode 100644 index 000000000..a53527ecf --- /dev/null +++ b/core/LICENSE.txt @@ -0,0 +1,23 @@ +Temporal Core SDK + +The MIT License + +Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index ca35164cc..7b93c8c76 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -2,6 +2,13 @@ name = "temporal-sdk" version = "0.1.0-alpha.1" edition = "2021" +authors = ["Spencer Judge "] +license-file = "LICENSE.txt" +description = "Temporal Rust SDK" +homepage = "https://temporal.io/" +repository = "https://github.com/temporalio/sdk-core" +keywords = ["temporal", "workflow"] +categories = ["development-tools"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/sdk/LICENSE.txt b/sdk/LICENSE.txt new file mode 100644 index 000000000..a53527ecf --- /dev/null +++ b/sdk/LICENSE.txt @@ -0,0 +1,23 @@ +Temporal Core SDK + +The MIT License + +Copyright (c) 2021 Temporal Technologies, Inc. All Rights Reserved + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. From 1500f6f0822bd600f259cdb833657d930f70a40c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 10 Mar 2022 11:45:38 -0800 Subject: [PATCH 2/5] Rename `TestRustWorker` to `Worker` --- client/Cargo.toml | 1 - core/benches/workflow_replay.rs | 6 ++--- core/src/core_tests/child_workflows.rs | 4 ++-- core/src/core_tests/determinism.rs | 6 ++--- core/src/core_tests/local_activities.rs | 12 +++++----- sdk/src/lib.rs | 28 ++++++++++++++-------- test-utils/src/lib.rs | 23 ++++++++++-------- tests/integ_tests/workflow_tests/replay.rs | 4 ++-- 8 files changed, 47 insertions(+), 37 deletions(-) diff --git a/client/Cargo.toml b/client/Cargo.toml index 8ff501688..f3e651a1c 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -36,4 +36,3 @@ uuid = { version = "0.8.2", features = ["v4"] } [dependencies.temporal-sdk-core-protos] path = "../sdk-core-protos" version = "0.1" - diff --git a/core/benches/workflow_replay.rs b/core/benches/workflow_replay.rs index 67db96e3f..5475e612c 100644 --- a/core/benches/workflow_replay.rs +++ b/core/benches/workflow_replay.rs @@ -1,7 +1,7 @@ use criterion::{criterion_group, criterion_main, Criterion}; use futures::StreamExt; use std::time::Duration; -use temporal_sdk::{TestRustWorker, WfContext, WorkflowFunction}; +use temporal_sdk::{WfContext, Worker, WorkflowFunction}; use temporal_sdk_core::{telemetry_init, TelemetryOptionsBuilder}; use temporal_sdk_core_protos::DEFAULT_WORKFLOW_TYPE; use temporal_sdk_core_test_utils::{canned_histories, init_core_replay_preloaded}; @@ -29,7 +29,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { tokio_runtime.block_on(async { let func = timers_wf(num_timers); let (worker, _) = init_core_replay_preloaded("replay_bench", &hist); - let mut worker = TestRustWorker::new(worker, "replay_bench".to_string(), None); + let mut worker = Worker::new(worker, "replay_bench".to_string(), None); worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); worker.incr_expected_run_count(1); worker.run_until_done().await.unwrap(); @@ -46,7 +46,7 @@ pub fn criterion_benchmark(c: &mut Criterion) { tokio_runtime.block_on(async { let func = big_signals_wf(num_tasks); let (worker, _) = init_core_replay_preloaded("large_hist_bench", &hist); - let mut worker = TestRustWorker::new(worker, "large_hist_bench".to_string(), None); + let mut worker = Worker::new(worker, "large_hist_bench".to_string(), None); worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); worker.incr_expected_run_count(1); worker.run_until_done().await.unwrap(); diff --git a/core/src/core_tests/child_workflows.rs b/core/src/core_tests/child_workflows.rs index ffd6296c2..fec5c3bf6 100644 --- a/core/src/core_tests/child_workflows.rs +++ b/core/src/core_tests/child_workflows.rs @@ -8,7 +8,7 @@ use crate::{ use std::sync::Arc; use temporal_client::{mocks::mock_gateway, WorkflowOptions}; use temporal_sdk::{ - ChildWorkflowOptions, Signal, TestRustWorker, WfContext, WorkflowFunction, WorkflowResult, + ChildWorkflowOptions, Signal, WfContext, Worker, WorkflowFunction, WorkflowResult, }; use temporal_sdk_core_protos::coresdk::child_workflow::{ child_workflow_result, ChildWorkflowCancellationType, @@ -29,7 +29,7 @@ async fn signal_child_workflow(#[case] serial: bool) { let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); let wf = move |ctx: WfContext| async move { let child = ctx.child_workflow(ChildWorkflowOptions { diff --git a/core/src/core_tests/determinism.rs b/core/src/core_tests/determinism.rs index 94d4eaa13..c856ddf6e 100644 --- a/core/src/core_tests/determinism.rs +++ b/core/src/core_tests/determinism.rs @@ -12,7 +12,7 @@ use std::{ time::Duration, }; use temporal_client::{mocks::mock_gateway, WorkflowOptions}; -use temporal_sdk::{TestRustWorker, WfContext, WorkflowResult}; +use temporal_sdk::{WfContext, Worker, WorkflowResult}; use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause; static DID_FAIL: AtomicBool = AtomicBool::new(false); @@ -43,7 +43,7 @@ async fn test_wf_task_rejected_properly() { Box::new(|_, cause, _| matches!(cause, WorkflowTaskFailedCause::Unspecified)); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); worker.register_wf(wf_type.to_owned(), timer_wf_fails_once); worker @@ -87,7 +87,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache: }); } let core = mock_worker(mock); - let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); let started_count: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move { diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index 66e232647..df782b0ac 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -12,7 +12,7 @@ use std::{ time::Duration, }; use temporal_client::{mocks::mock_gateway, WorkflowOptions}; -use temporal_sdk::{LocalActivityOptions, TestRustWorker, WfContext, WorkflowResult}; +use temporal_sdk::{LocalActivityOptions, WfContext, Worker, WorkflowResult}; use temporal_sdk_core_protos::{ coresdk::{common::RetryPolicy, AsJsonPayloadExt}, temporal::api::{enums::v1::EventType, failure::v1::Failure}, @@ -55,7 +55,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached: mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1); } let core = mock_worker(mock); - let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); worker.register_wf( DEFAULT_WORKFLOW_TYPE.to_owned(), @@ -119,7 +119,7 @@ async fn local_act_many_concurrent() { let mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf); worker.register_activity("echo", |str: String| async move { Ok(str) }); @@ -168,7 +168,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) { let mut mock = build_mock_pollers(mh); mock.worker_cfg(|wc| wc.max_cached_workflows = 1); let core = Arc::new(mock_worker(mock)); - let mut worker = TestRustWorker::new(core.clone(), TEST_Q.to_string(), None); + let mut worker = Worker::new(core.clone(), TEST_Q.to_string(), None); let shutdown_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2))); worker.register_wf( @@ -226,7 +226,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) { let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); worker.register_wf( DEFAULT_WORKFLOW_TYPE.to_owned(), @@ -310,7 +310,7 @@ async fn local_act_retry_long_backoff_uses_timer() { let mut mock = build_mock_pollers(mh); mock.worker_cfg(|w| w.max_cached_workflows = 1); let core = mock_worker(mock); - let mut worker = TestRustWorker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); worker.register_wf( DEFAULT_WORKFLOW_TYPE.to_owned(), diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 170ef3d0e..7647e86cf 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -35,7 +35,7 @@ use std::{ use temporal_client::{ServerGatewayApis, WorkflowOptions}; use temporal_sdk_core_api::{ errors::{PollActivityError, PollWfError}, - Worker, + Worker as CoreWorker, }; use temporal_sdk_core_protos::{ coresdk::{ @@ -64,9 +64,10 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; -/// A worker that can poll for and respond to workflow tasks by using [WorkflowFunction]s -pub struct TestRustWorker { - worker: Arc, +/// A worker that can poll for and respond to workflow tasks by using [WorkflowFunction]s, +/// and activity tasks by using [ActivityFunction]s +pub struct Worker { + worker: Arc, task_queue: String, task_timeout: Option, workflow_half: WorkflowHalf, @@ -91,10 +92,10 @@ struct ActivityHalf { task_tokens_to_cancels: HashMap, } -impl TestRustWorker { +impl Worker { /// Create a new rust worker pub fn new( - worker: Arc, + worker: Arc, task_queue: String, task_timeout: Option, ) -> Self { @@ -280,11 +281,18 @@ impl TestRustWorker { /// Turns this rust worker into a new worker with all the same workflows and activities /// registered, but with a new underlying core worker. Can be used to swap the worker for /// a replay worker, change task queues, etc. - pub fn with_new_core_worker(&mut self, new_core_worker: Arc) { + pub fn with_new_core_worker(&mut self, new_core_worker: Arc) { self.worker = new_core_worker; } - fn split_apart(&mut self) -> (Arc, &str, &mut WorkflowHalf, &mut ActivityHalf) { + fn split_apart( + &mut self, + ) -> ( + Arc, + &str, + &mut WorkflowHalf, + &mut ActivityHalf, + ) { ( self.worker.clone(), &self.task_queue, @@ -297,7 +305,7 @@ impl TestRustWorker { impl WorkflowHalf { async fn workflow_activation_handler( &mut self, - worker: &dyn Worker, + worker: &dyn CoreWorker, task_queue: &str, shutdown_rx: &Receiver, completions_tx: &UnboundedSender, @@ -374,7 +382,7 @@ impl ActivityHalf { /// Spawns off a task to handle the provided activity task fn activity_task_handler( &mut self, - worker: Arc, + worker: Arc, activity: ActivityTask, ) -> Result<(), anyhow::Error> { match activity.variant { diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 8bbac071e..9d9faee3a 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -12,13 +12,13 @@ use std::{ time::Duration, }; use temporal_client::{RetryGateway, ServerGateway, ServerGatewayApis, WorkflowOptions}; -use temporal_sdk::TestRustWorker; +use temporal_sdk::Worker; use temporal_sdk_core::{ init_replay_worker, init_worker, replay::mock_gateway_from_history, telemetry_init, ServerGatewayOptions, ServerGatewayOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder, WorkerConfig, WorkerConfigBuilder, }; -use temporal_sdk_core_api::Worker; +use temporal_sdk_core_api::Worker as CoreWorker; use temporal_sdk_core_protos::{ coresdk::{ workflow_commands::{ @@ -41,7 +41,7 @@ const PROM_ENABLE_ENV_VAR: &str = "TEMPORAL_INTEG_PROM_PORT"; /// Create a worker instance which will use the provided test name to base the task queue and wf id /// upon. Returns the instance and the task queue name (which is also the workflow id). -pub async fn init_core_and_create_wf(test_name: &str) -> (Arc, String) { +pub async fn init_core_and_create_wf(test_name: &str) -> (Arc, String) { let mut starter = CoreWfStarter::new(test_name); let core = starter.get_worker().await; starter.start_wf().await; @@ -50,7 +50,10 @@ pub async fn init_core_and_create_wf(test_name: &str) -> (Arc, Strin /// Create a worker replay instance preloaded with a provided history. Returns the worker impl /// and the task queue name as in [init_core_and_create_wf]. -pub fn init_core_replay_preloaded(test_name: &str, history: &History) -> (Arc, String) { +pub fn init_core_replay_preloaded( + test_name: &str, + history: &History, +) -> (Arc, String) { let worker_cfg = WorkerConfigBuilder::default() .namespace(NAMESPACE) .task_queue(test_name) @@ -81,7 +84,7 @@ pub struct CoreWfStarter { initted_worker: OnceCell, } struct InitializedWorker { - worker: Arc, + worker: Arc, client: Arc>, } @@ -108,8 +111,8 @@ impl CoreWfStarter { } } - pub async fn worker(&mut self) -> TestRustWorker { - TestRustWorker::new( + pub async fn worker(&mut self) -> Worker { + Worker::new( self.get_worker().await, self.worker_config.task_queue.clone(), self.wft_timeout, @@ -120,7 +123,7 @@ impl CoreWfStarter { self.get_worker().await.shutdown().await; } - pub async fn get_worker(&mut self) -> Arc { + pub async fn get_worker(&mut self) -> Arc { self.get_or_init().await.worker.clone() } @@ -162,7 +165,7 @@ impl CoreWfStarter { wf_id: impl Into, run_id: impl Into, // TODO: Need not be passed in - worker: &mut TestRustWorker, + worker: &mut Worker, ) -> Result<(), anyhow::Error> { // Fetch history and replay it let history = self @@ -335,7 +338,7 @@ pub trait WorkerTestHelpers { #[async_trait::async_trait] impl WorkerTestHelpers for T where - T: Worker + ?Sized, + T: CoreWorker + ?Sized, { async fn complete_execution(&self, run_id: &str) { self.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( diff --git a/tests/integ_tests/workflow_tests/replay.rs b/tests/integ_tests/workflow_tests/replay.rs index a458847df..f20d93fe4 100644 --- a/tests/integ_tests/workflow_tests/replay.rs +++ b/tests/integ_tests/workflow_tests/replay.rs @@ -1,6 +1,6 @@ use assert_matches::assert_matches; use std::time::Duration; -use temporal_sdk::{TestRustWorker, WfContext, WorkflowFunction}; +use temporal_sdk::{WfContext, Worker, WorkflowFunction}; use temporal_sdk_core::{replay::mock_gateway_from_history, telemetry_init, ServerGatewayApis}; use temporal_sdk_core_api::errors::{PollActivityError, PollWfError}; use temporal_sdk_core_protos::{ @@ -133,7 +133,7 @@ async fn replay_using_wf_function() { let func = timers_wf(num_timers); let (worker, _) = init_core_replay_preloaded("replay_bench", &t.get_full_history_info().unwrap().into()); - let mut worker = TestRustWorker::new(worker, "replay_bench".to_string(), None); + let mut worker = Worker::new(worker, "replay_bench".to_string(), None); worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); worker.incr_expected_run_count(1); worker.run_until_done().await.unwrap(); From 8cf73d8724b710beea24ca76a16b126a6e6ab916 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 10 Mar 2022 13:56:12 -0800 Subject: [PATCH 3/5] Add doctest example --- sdk/Cargo.toml | 3 ++ sdk/src/lib.rs | 74 +++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/sdk/Cargo.toml b/sdk/Cargo.toml index 7b93c8c76..099d61813 100644 --- a/sdk/Cargo.toml +++ b/sdk/Cargo.toml @@ -14,11 +14,14 @@ categories = ["development-tools"] [dependencies] anyhow = "1.0" +base64 = "0.13" crossbeam = "0.8" derive_more = "0.99" futures = "0.3" +once_cell = "1.10" parking_lot = { version = "0.12", features = ["send_guard"] } prost-types = "0.9" +sha2 = "0.10" serde = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread", "parking_lot", "time", "fs"] } tokio-util = { version = "0.7" } diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 7647e86cf..20e8bb7cb 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -1,10 +1,36 @@ #![warn(missing_docs)] // error if there are missing docs -//! This crate is a rough prototype Rust SDK. It can be used to create closures that look sort of -//! like normal workflow code. It should only depend on things in the core crate that are already -//! publicly exposed. +//! This crate defines an alpha-stage Temporal Rust SDK. //! -//! Needs lots of love to be production ready but the basis is there +//! Currently defining activities and running an activity-only worker is the most stable code. +//! Workflow definitions exist and running a workflow worker works, but the API is still very +//! unstable. +//! +//! An example of running an activity worker: +//! ```no_run +//! use std::sync::Arc; +//! use std::str::FromStr; +//! use temporal_sdk::{sdk_client_options, Worker}; +//! use temporal_sdk_core::{init_worker, Url}; +//! use temporal_sdk_core_api::worker::{WorkerConfig, WorkerConfigBuilder}; +//! +//! #[tokio::main] +//! async fn main() -> Result<(), Box> { +//! let server_options = sdk_client_options(Url::from_str("http://localhost:7233")?).build()?; +//! let client = server_options.connect("my_namespace", None).await?; +//! let worker_config = WorkerConfigBuilder::default().build()?; +//! let core_worker = init_worker(worker_config, client); +//! +//! let mut worker = Worker::new(Arc::new(core_worker), "task_queue", None); +//! worker.register_activity( +//! "echo_activity", +//! |echo_me: String| async move { Ok(echo_me) }, +//! ); +//! // TODO: This should be different +//! worker.run_until_done().await?; +//! Ok(()) +//! } +//! ``` #[macro_use] extern crate tracing; @@ -22,6 +48,7 @@ pub use workflow_context::{ use crate::workflow_context::{ChildWfCommon, PendingChildWorkflow}; use anyhow::{anyhow, bail}; use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; +use once_cell::sync::OnceCell; use std::{ collections::HashMap, fmt::{Debug, Display, Formatter}, @@ -32,7 +59,8 @@ use std::{ }, time::Duration, }; -use temporal_client::{ServerGatewayApis, WorkflowOptions}; +use temporal_client::{ServerGatewayApis, ServerGatewayOptionsBuilder, WorkflowOptions}; +use temporal_sdk_core::Url; use temporal_sdk_core_api::{ errors::{PollActivityError, PollWfError}, Worker as CoreWorker, @@ -64,6 +92,21 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; +const VERSION: &str = env!("CARGO_PKG_VERSION"); + +/// Returns a [ServerGatewayOptionsBuilder] with required fields set to appropriate values +/// for the Rust SDK. +pub fn sdk_client_options(url: impl Into) -> ServerGatewayOptionsBuilder { + let mut builder = ServerGatewayOptionsBuilder::default(); + builder + .target_url(url) + .client_name("rust-sdk".to_string()) + .client_version(VERSION.to_string()) + .worker_binary_id(binary_id().to_string()); + + builder +} + /// A worker that can poll for and respond to workflow tasks by using [WorkflowFunction]s, /// and activity tasks by using [ActivityFunction]s pub struct Worker { @@ -96,12 +139,12 @@ impl Worker { /// Create a new rust worker pub fn new( worker: Arc, - task_queue: String, + task_queue: impl Into, task_timeout: Option, ) -> Self { Self { worker, - task_queue, + task_queue: task_queue.into(), task_timeout, workflow_half: WorkflowHalf { workflows: Default::default(), @@ -683,3 +726,20 @@ where Arc::new(wrapper) } } + +/// Reads own binary, hashes it, and returns b64 str version of that hash +fn binary_id() -> &'static str { + use sha2::{Digest, Sha256}; + use std::{env, fs, io}; + + static INSTANCE: OnceCell = OnceCell::new(); + INSTANCE.get_or_init(|| { + let exe_path = env::current_exe().expect("Cannot read own binary to determine binary id"); + let mut exe_file = + fs::File::open(exe_path).expect("Cannot read own binary to determine binary id"); + let mut hasher = Sha256::new(); + io::copy(&mut exe_file, &mut hasher).expect("Copying data into binary hasher works"); + let hash = hasher.finalize(); + base64::encode(hash) + }) +} From 67f46d37c1f16cd49ca2941f03b625e512265d6d Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 10 Mar 2022 15:22:30 -0800 Subject: [PATCH 4/5] Compile fixes to lift out test worker -- tests still not passing --- core/benches/workflow_replay.rs | 10 +- core/src/core_tests/child_workflows.rs | 7 +- core/src/core_tests/determinism.rs | 7 +- core/src/core_tests/local_activities.rs | 13 +-- sdk/src/lib.rs | 86 ++--------------- test-utils/Cargo.toml | 1 + test-utils/src/lib.rs | 94 +++++++++++++++++-- .../workflow_tests/local_activities.rs | 2 +- tests/integ_tests/workflow_tests/replay.rs | 5 +- .../workflow_tests/upsert_search_attrs.rs | 1 + 10 files changed, 117 insertions(+), 109 deletions(-) diff --git a/core/benches/workflow_replay.rs b/core/benches/workflow_replay.rs index 5475e612c..45db902d7 100644 --- a/core/benches/workflow_replay.rs +++ b/core/benches/workflow_replay.rs @@ -29,10 +29,9 @@ pub fn criterion_benchmark(c: &mut Criterion) { tokio_runtime.block_on(async { let func = timers_wf(num_timers); let (worker, _) = init_core_replay_preloaded("replay_bench", &hist); - let mut worker = Worker::new(worker, "replay_bench".to_string(), None); + let mut worker = Worker::new(worker, "replay_bench".to_string()); worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); - worker.incr_expected_run_count(1); - worker.run_until_done().await.unwrap(); + worker.run().await.unwrap(); }) }) }); @@ -46,10 +45,9 @@ pub fn criterion_benchmark(c: &mut Criterion) { tokio_runtime.block_on(async { let func = big_signals_wf(num_tasks); let (worker, _) = init_core_replay_preloaded("large_hist_bench", &hist); - let mut worker = Worker::new(worker, "large_hist_bench".to_string(), None); + let mut worker = Worker::new(worker, "large_hist_bench".to_string()); worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); - worker.incr_expected_run_count(1); - worker.run_until_done().await.unwrap(); + worker.run().await.unwrap(); }) }) }); diff --git a/core/src/core_tests/child_workflows.rs b/core/src/core_tests/child_workflows.rs index fec5c3bf6..f522535c0 100644 --- a/core/src/core_tests/child_workflows.rs +++ b/core/src/core_tests/child_workflows.rs @@ -7,12 +7,11 @@ use crate::{ }; use std::sync::Arc; use temporal_client::{mocks::mock_gateway, WorkflowOptions}; -use temporal_sdk::{ - ChildWorkflowOptions, Signal, WfContext, Worker, WorkflowFunction, WorkflowResult, -}; +use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowFunction, WorkflowResult}; use temporal_sdk_core_protos::coresdk::child_workflow::{ child_workflow_result, ChildWorkflowCancellationType, }; +use temporal_sdk_core_test_utils::TestWorker; use tokio::join; const SIGNAME: &str = "SIGNAME"; @@ -29,7 +28,7 @@ async fn signal_child_workflow(#[case] serial: bool) { let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string()); let wf = move |ctx: WfContext| async move { let child = ctx.child_workflow(ChildWorkflowOptions { diff --git a/core/src/core_tests/determinism.rs b/core/src/core_tests/determinism.rs index c856ddf6e..bcb435218 100644 --- a/core/src/core_tests/determinism.rs +++ b/core/src/core_tests/determinism.rs @@ -12,8 +12,9 @@ use std::{ time::Duration, }; use temporal_client::{mocks::mock_gateway, WorkflowOptions}; -use temporal_sdk::{WfContext, Worker, WorkflowResult}; +use temporal_sdk::{WfContext, WorkflowResult}; use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause; +use temporal_sdk_core_test_utils::TestWorker; static DID_FAIL: AtomicBool = AtomicBool::new(false); pub async fn timer_wf_fails_once(ctx: WfContext) -> WorkflowResult<()> { @@ -43,7 +44,7 @@ async fn test_wf_task_rejected_properly() { Box::new(|_, cause, _| matches!(cause, WorkflowTaskFailedCause::Unspecified)); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string()); worker.register_wf(wf_type.to_owned(), timer_wf_fails_once); worker @@ -87,7 +88,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache: }); } let core = mock_worker(mock); - let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string()); let started_count: &'static _ = Box::leak(Box::new(AtomicUsize::new(0))); worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move { diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index df782b0ac..0bdce60d5 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -12,11 +12,12 @@ use std::{ time::Duration, }; use temporal_client::{mocks::mock_gateway, WorkflowOptions}; -use temporal_sdk::{LocalActivityOptions, WfContext, Worker, WorkflowResult}; +use temporal_sdk::{LocalActivityOptions, WfContext, WorkflowResult}; use temporal_sdk_core_protos::{ coresdk::{common::RetryPolicy, AsJsonPayloadExt}, temporal::api::{enums::v1::EventType, failure::v1::Failure}, }; +use temporal_sdk_core_test_utils::TestWorker; use tokio::sync::Barrier; async fn echo(e: String) -> anyhow::Result { @@ -55,7 +56,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached: mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1); } let core = mock_worker(mock); - let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string()); worker.register_wf( DEFAULT_WORKFLOW_TYPE.to_owned(), @@ -119,7 +120,7 @@ async fn local_act_many_concurrent() { let mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string()); worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf); worker.register_activity("echo", |str: String| async move { Ok(str) }); @@ -168,7 +169,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) { let mut mock = build_mock_pollers(mh); mock.worker_cfg(|wc| wc.max_cached_workflows = 1); let core = Arc::new(mock_worker(mock)); - let mut worker = Worker::new(core.clone(), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(core.clone(), TEST_Q.to_string()); let shutdown_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2))); worker.register_wf( @@ -226,7 +227,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) { let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock); let mock = build_mock_pollers(mh); let core = mock_worker(mock); - let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string()); worker.register_wf( DEFAULT_WORKFLOW_TYPE.to_owned(), @@ -310,7 +311,7 @@ async fn local_act_retry_long_backoff_uses_timer() { let mut mock = build_mock_pollers(mh); mock.worker_cfg(|w| w.max_cached_workflows = 1); let core = mock_worker(mock); - let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None); + let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string()); worker.register_wf( DEFAULT_WORKFLOW_TYPE.to_owned(), diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 20e8bb7cb..286a63d34 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -53,13 +53,9 @@ use std::{ collections::HashMap, fmt::{Debug, Display, Formatter}, future::Future, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, + sync::Arc, }; -use temporal_client::{ServerGatewayApis, ServerGatewayOptionsBuilder, WorkflowOptions}; +use temporal_client::{ServerGatewayApis, ServerGatewayOptionsBuilder}; use temporal_sdk_core::Url; use temporal_sdk_core_api::{ errors::{PollActivityError, PollWfError}, @@ -112,7 +108,6 @@ pub fn sdk_client_options(url: impl Into) -> ServerGatewayOptionsBuilder { pub struct Worker { worker: Arc, task_queue: String, - task_timeout: Option, workflow_half: WorkflowHalf, activity_half: ActivityHalf, } @@ -122,8 +117,6 @@ struct WorkflowHalf { workflows: HashMap>, /// Maps workflow type to the function for executing workflow runs with that ID workflow_fns: HashMap, - /// Number of live workflows - incomplete_workflows: Arc, /// Handles for each spawned workflow run are inserted here to be cleaned up when all runs /// are finished join_handles: FuturesUnordered, JoinError>>>, @@ -137,19 +130,13 @@ struct ActivityHalf { impl Worker { /// Create a new rust worker - pub fn new( - worker: Arc, - task_queue: impl Into, - task_timeout: Option, - ) -> Self { + pub fn new(worker: Arc, task_queue: impl Into) -> Self { Self { worker, task_queue: task_queue.into(), - task_timeout, workflow_half: WorkflowHalf { workflows: Default::default(), workflow_fns: Default::default(), - incomplete_workflows: Arc::new(AtomicUsize::new(0)), join_handles: FuturesUnordered::new(), }, activity_half: ActivityHalf { @@ -169,36 +156,6 @@ impl Worker { &self.task_queue } - /// Create a workflow, asking the server to start it with the provided workflow ID and using the - /// provided workflow function. - /// - /// Increments the expected Workflow run count. - /// - /// Returns the run id of the started workflow - pub async fn submit_wf( - &self, - workflow_id: impl Into, - workflow_type: impl Into, - input: Vec, - mut options: WorkflowOptions, - ) -> Result { - options.task_timeout = options.task_timeout.or(self.task_timeout); - let res = self - .worker - .server_gateway() - .start_workflow( - input, - self.task_queue.clone(), - workflow_id.into(), - workflow_type.into(), - options, - ) - .await?; - - self.incr_expected_run_count(1); - Ok(res.run_id) - } - /// Register a Workflow function to invoke when the Worker is asked to run a workflow of /// `workflow_type` pub fn register_wf>( @@ -226,25 +183,9 @@ impl Worker { ); } - // TODO: Should be removed before making this worker prod ready. There can be a test worker - // which wraps this one and implements the workflow counting / run_until_done concepts. - // This worker can expose an interceptor for completions that could be used to assist with - // workflow tracking - /// Increment the expected Workflow run count on this Worker. The Worker tracks the run count - /// and will resolve `run_until_done` when it goes down to 0. - /// You do not have to increment if scheduled a Workflow with `submit_wf`. - pub fn incr_expected_run_count(&self, count: usize) { - self.workflow_half - .incomplete_workflows - .fetch_add(count, Ordering::SeqCst); - } - - /// See [Self::run_until_done], except calls the provided callback just before performing core - /// shutdown. - pub async fn run_until_done_shutdown_hook( - &mut self, - before_shutdown: impl FnOnce(), - ) -> Result<(), anyhow::Error> { + /// Runs the worker. Eventually resolves after the worker has been explicitly shut down, + /// or may return early with an error in the event of some unresolvable problem. + pub async fn run(&mut self) -> Result<(), anyhow::Error> { let (shutdown_tx, shutdown_rx) = watch::channel(false); let pollers = async move { let (worker, task_q, wf_half, act_half) = self.split_apart(); @@ -269,13 +210,6 @@ impl Worker { activation, ) .await?; - if wf_half.incomplete_workflows.load(Ordering::SeqCst) == 0 { - info!("All expected workflows complete"); - // Die rebel scum - evict all workflows (which are complete now), - // and turn off activity polling. - let _ = shutdown_tx.send(true); - break Result::<_, anyhow::Error>::Ok(()); - } } }, // Only poll on the activity queue if activity functions have been registered. This @@ -309,18 +243,11 @@ impl Worker { while let Some(h) = myself.workflow_half.join_handles.next().await { h??; } - before_shutdown(); myself.worker.shutdown().await; myself.workflow_half.workflows.clear(); Ok(()) } - /// Drives all workflows & activities until they have all finished, repeatedly polls server to - /// fetch work for them. - pub async fn run_until_done(&mut self) -> Result<(), anyhow::Error> { - self.run_until_done_shutdown_hook(|| {}).await - } - /// Turns this rust worker into a new worker with all the same workflows and activities /// registered, but with a new underlying core worker. Can be used to swap the worker for /// a replay worker, change task queues, etc. @@ -398,7 +325,6 @@ impl WorkflowHalf { let completion = completions_rx.recv().await.expect("No workflows left?"); if completion.has_execution_ending() { debug!("Workflow {} says it's finishing", &completion.run_id); - self.incomplete_workflows.fetch_sub(1, Ordering::SeqCst); } worker.complete_workflow_activation(completion).await?; Ok(()) diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index 3cb66d0fe..db11f09fa 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -24,6 +24,7 @@ temporal-sdk-core = { path = "../core" } temporal-sdk-core-api = { path = "../core-api" } thiserror = "1.0" tokio = "1.1" +tracing = "0.1" url = "2.2" uuid = "0.8" diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 9d9faee3a..2e8ca57b8 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -7,18 +7,20 @@ use futures::{stream::FuturesUnordered, StreamExt}; use log::LevelFilter; use prost::Message; use rand::{distributions::Standard, Rng}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::{ convert::TryFrom, env, future::Future, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, }; use temporal_client::{RetryGateway, ServerGateway, ServerGatewayApis, WorkflowOptions}; -use temporal_sdk::Worker; +use temporal_sdk::{IntoActivityFunc, Worker, WorkflowFunction}; use temporal_sdk_core::{ init_replay_worker, init_worker, replay::mock_gateway_from_history, telemetry_init, ServerGatewayOptions, ServerGatewayOptionsBuilder, TelemetryOptions, TelemetryOptionsBuilder, WorkerConfig, WorkerConfigBuilder, }; use temporal_sdk_core_api::Worker as CoreWorker; +use temporal_sdk_core_protos::coresdk::common::Payload; use temporal_sdk_core_protos::{ coresdk::{ workflow_commands::{ @@ -111,11 +113,10 @@ impl CoreWfStarter { } } - pub async fn worker(&mut self) -> Worker { - Worker::new( + pub async fn worker(&mut self) -> TestWorker { + TestWorker::new( self.get_worker().await, self.worker_config.task_queue.clone(), - self.wft_timeout, ) } @@ -178,8 +179,7 @@ impl CoreWfStarter { .expect("history field must be populated"); let (replay_worker, _) = init_core_replay_preloaded(worker.task_queue(), &history); worker.with_new_core_worker(replay_worker); - worker.incr_expected_run_count(1); - worker.run_until_done().await.unwrap(); + worker.run().await.unwrap(); Ok(()) } @@ -241,6 +241,88 @@ impl CoreWfStarter { } } +/// Provides conveniences for running integ tests with the SDK +pub struct TestWorker { + inner: Worker, + expected_wf_runs: AtomicUsize, +} +impl TestWorker { + /// Create a new test worker + pub fn new(core_worker: Arc, task_queue: impl Into) -> Self { + Self { + inner: Worker::new(core_worker, task_queue), + expected_wf_runs: AtomicUsize::new(0), + } + } + + pub fn inner_mut(&mut self) -> &mut Worker { + &mut self.inner + } + + pub fn incr_expected_run_count(&self, amount: usize) { + self.expected_wf_runs.fetch_add(amount, Ordering::AcqRel); + } + + // TODO: Maybe trait-ify? + pub fn register_wf>( + &mut self, + workflow_type: impl Into, + wf_function: F, + ) { + self.inner.register_wf(workflow_type, wf_function) + } + + pub fn register_activity( + &mut self, + activity_type: impl Into, + act_function: impl IntoActivityFunc, + ) { + self.inner.register_activity(activity_type, act_function) + } + + /// Create a workflow, asking the server to start it with the provided workflow ID and using the + /// provided workflow function. + /// + /// Increments the expected Workflow run count. + /// + /// Returns the run id of the started workflow + pub async fn submit_wf( + &self, + workflow_id: impl Into, + workflow_type: impl Into, + input: Vec, + options: WorkflowOptions, + ) -> Result { + let wfid = workflow_id.into(); + let res = self + .inner + .server_gateway() + .start_workflow( + input, + self.inner.task_queue().to_string(), + wfid.clone(), + workflow_type.into(), + options, + ) + .await?; + Ok(res.run_id) + } + + /// Runs until all expected workflows have completed + pub async fn run_until_done(&mut self) -> Result<(), anyhow::Error> { + todo!() + } + + /// See [Self::run_until_done], except calls the provided callback just before performing core + /// shutdown. + pub async fn run_until_done_shutdown_hook( + &mut self, + before_shutdown: impl FnOnce(), + ) -> Result<(), anyhow::Error> { + todo!() + } +} + pub fn get_integ_server_options() -> ServerGatewayOptions { let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { Ok(addr) => addr, diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index fc903da5b..3225cb85b 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -214,7 +214,7 @@ async fn local_act_retry_timer_backoff() { .unwrap(); worker.run_until_done().await.unwrap(); starter - .fetch_history_and_replay(wf_name, run_id, &mut worker) + .fetch_history_and_replay(wf_name, run_id, worker.inner_mut()) .await .unwrap(); } diff --git a/tests/integ_tests/workflow_tests/replay.rs b/tests/integ_tests/workflow_tests/replay.rs index f20d93fe4..22ba5a833 100644 --- a/tests/integ_tests/workflow_tests/replay.rs +++ b/tests/integ_tests/workflow_tests/replay.rs @@ -133,10 +133,9 @@ async fn replay_using_wf_function() { let func = timers_wf(num_timers); let (worker, _) = init_core_replay_preloaded("replay_bench", &t.get_full_history_info().unwrap().into()); - let mut worker = Worker::new(worker, "replay_bench".to_string(), None); + let mut worker = Worker::new(worker, "replay_bench".to_string()); worker.register_wf(DEFAULT_WORKFLOW_TYPE, func); - worker.incr_expected_run_count(1); - worker.run_until_done().await.unwrap(); + worker.run().await.unwrap(); } fn timers_wf(num_timers: u32) -> WorkflowFunction { diff --git a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs index 13ec3423f..d82d7b2e6 100644 --- a/tests/integ_tests/workflow_tests/upsert_search_attrs.rs +++ b/tests/integ_tests/workflow_tests/upsert_search_attrs.rs @@ -43,6 +43,7 @@ async fn sends_upsert() { worker.run_until_done().await.unwrap(); let search_attrs = worker + .inner_mut() .server_gateway() .describe_workflow_execution(wf_id.to_string(), Some(run_id)) .await From 930646d442a18dce389023baad14b8013dd9df93 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 11 Mar 2022 10:55:54 -0800 Subject: [PATCH 5/5] Unit and integ tests working --- core-api/src/lib.rs | 3 + core/src/telemetry/mod.rs | 2 +- core/src/worker/mod.rs | 22 ++--- sdk/src/interceptors.rs | 11 +++ sdk/src/lib.rs | 85 +++++++++++-------- test-utils/Cargo.toml | 1 + test-utils/src/lib.rs | 58 +++++++++++-- .../workflow_tests/local_activities.rs | 4 +- 8 files changed, 132 insertions(+), 54 deletions(-) create mode 100644 sdk/src/interceptors.rs diff --git a/core-api/src/lib.rs b/core-api/src/lib.rs index 859b89049..9a78384a9 100644 --- a/core-api/src/lib.rs +++ b/core-api/src/lib.rs @@ -97,6 +97,9 @@ pub trait Worker: Send + Sync { /// Return this worker's config fn get_config(&self) -> &WorkerConfig; + /// TODO: Will be replaced/fixed/whatever by shutdown refactoring + fn initiate_shutdown(&self); + /// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts /// down this worker. [Worker::poll_workflow_activation] should be called until it /// returns [PollWfError::ShutDown] to ensure that any workflows which are still undergoing diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 2c0d09e6f..1345398ab 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -253,7 +253,7 @@ pub fn fetch_global_buffered_logs() -> Vec { pub(crate) fn test_telem_console() { telemetry_init(&TelemetryOptions { otel_collector_url: None, - tracing_filter: "temporal_sdk_core=DEBUG".to_string(), + tracing_filter: "temporal_sdk_core=DEBUG,temporal_sdk=DEBUG".to_string(), log_forwarding_level: LevelFilter::Off, prometheus_export_bind_address: None, totally_disable: false, diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index de892daf6..16acaf11c 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -162,6 +162,17 @@ impl WorkerTrait for Worker { &self.config } + /// Begins the shutdown process, tells pollers they should stop. Is idempotent. + // TODO: will be in trait after Roey's shutdown refactor + fn initiate_shutdown(&self) { + self.shutdown_token.cancel(); + // First, we want to stop polling of both activity and workflow tasks + if let Some(atm) = self.at_task_mgr.as_ref() { + atm.notify_shutdown(); + } + self.wf_task_source.stop_pollers(); + } + async fn shutdown(&self) { self.shutdown().await } @@ -291,20 +302,11 @@ impl Worker { } } - /// Begins the shutdown process, tells pollers they should stop. Is idempotent. - pub(crate) fn initiate_shutdown(&self) { - self.shutdown_token.cancel(); - // First, we want to stop polling of both activity and workflow tasks - if let Some(atm) = self.at_task_mgr.as_ref() { - atm.notify_shutdown(); - } - self.wf_task_source.stop_pollers(); - } - /// Will shutdown the worker. Does not resolve until all outstanding workflow tasks have been /// completed pub(crate) async fn shutdown(&self) { self.initiate_shutdown(); + info!("Initiated shutdown"); // Next we need to wait for all local activities to finish so no more workflow task // heartbeats will be generated self.local_act_mgr.shutdown_and_wait_all_finished().await; diff --git a/sdk/src/interceptors.rs b/sdk/src/interceptors.rs new file mode 100644 index 000000000..2366a5751 --- /dev/null +++ b/sdk/src/interceptors.rs @@ -0,0 +1,11 @@ +//! User-definable interceptors are defined in this module + +use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion; + +/// Implementors can intercept certain actions that happen within the Worker. +/// +/// Advanced usage only. +pub trait WorkerInterceptor { + /// Called every time a workflow activation completes + fn on_workflow_activation_completion(&self, completion: &WorkflowActivationCompletion); +} diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index 286a63d34..adb821d51 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -21,13 +21,12 @@ //! let worker_config = WorkerConfigBuilder::default().build()?; //! let core_worker = init_worker(worker_config, client); //! -//! let mut worker = Worker::new(Arc::new(core_worker), "task_queue", None); +//! let mut worker = Worker::new(Arc::new(core_worker), "task_queue"); //! worker.register_activity( //! "echo_activity", //! |echo_me: String| async move { Ok(echo_me) }, //! ); -//! // TODO: This should be different -//! worker.run_until_done().await?; +//! worker.run().await?; //! Ok(()) //! } //! ``` @@ -36,6 +35,7 @@ extern crate tracing; mod conversions; +pub mod interceptors; mod payload_converter; mod workflow_context; mod workflow_future; @@ -45,6 +45,7 @@ pub use workflow_context::{ Signal, SignalData, SignalWorkflowOptions, WfContext, }; +use crate::interceptors::WorkerInterceptor; use crate::workflow_context::{ChildWfCommon, PendingChildWorkflow}; use anyhow::{anyhow, bail}; use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt}; @@ -106,12 +107,17 @@ pub fn sdk_client_options(url: impl Into) -> ServerGatewayOptionsBuilder { /// A worker that can poll for and respond to workflow tasks by using [WorkflowFunction]s, /// and activity tasks by using [ActivityFunction]s pub struct Worker { - worker: Arc, - task_queue: String, + common: CommonWorker, workflow_half: WorkflowHalf, activity_half: ActivityHalf, } +struct CommonWorker { + worker: Arc, + task_queue: String, + worker_interceptor: Option>, +} + struct WorkflowHalf { /// Maps run id to the driver workflows: HashMap>, @@ -132,8 +138,11 @@ impl Worker { /// Create a new rust worker pub fn new(worker: Arc, task_queue: impl Into) -> Self { Self { - worker, - task_queue: task_queue.into(), + common: CommonWorker { + worker, + task_queue: task_queue.into(), + worker_interceptor: None, + }, workflow_half: WorkflowHalf { workflows: Default::default(), workflow_fns: Default::default(), @@ -148,12 +157,19 @@ impl Worker { /// Access the worker's server gateway client pub fn server_gateway(&self) -> Arc { - self.worker.server_gateway() + self.common.worker.server_gateway() } /// Returns the task queue name this worker polls on pub fn task_queue(&self) -> &str { - &self.task_queue + &self.common.task_queue + } + + /// Return a handle that can be used to initiate shutdown. + /// TODO: Doc better after shutdown changes + pub fn shutdown_handle(&self) -> impl Fn() { + let w = self.common.worker.clone(); + move || w.initiate_shutdown() } /// Register a Workflow function to invoke when the Worker is asked to run a workflow of @@ -188,13 +204,14 @@ impl Worker { pub async fn run(&mut self) -> Result<(), anyhow::Error> { let (shutdown_tx, shutdown_rx) = watch::channel(false); let pollers = async move { - let (worker, task_q, wf_half, act_half) = self.split_apart(); + let (common, wf_half, act_half) = self.split_apart(); let (completions_tx, mut completions_rx) = unbounded_channel(); let (wf_poll_res, act_poll_res) = tokio::join!( // Workflow polling loop async { loop { - let activation = match worker.poll_workflow_activation().await { + info!("Polling"); + let activation = match common.worker.poll_workflow_activation().await { Err(PollWfError::ShutDown) => { break Result::<_, anyhow::Error>::Ok(()); } @@ -202,8 +219,7 @@ impl Worker { }; wf_half .workflow_activation_handler( - worker.as_ref(), - task_q, + common, &shutdown_rx, &completions_tx, &mut completions_rx, @@ -219,11 +235,11 @@ impl Worker { if !act_half.activity_fns.is_empty() { loop { tokio::select! { - activity = worker.poll_activity_task() => { + activity = common.worker.poll_activity_task() => { if matches!(activity, Err(PollActivityError::ShutDown)) { break; } - act_half.activity_task_handler(worker.clone(), + act_half.activity_task_handler(common.worker.clone(), activity?)?; }, _ = shutdown_rx.changed() => { break } @@ -240,32 +256,31 @@ impl Worker { }; let myself = pollers.await?; + info!("Polling loop exited"); + let _ = shutdown_tx.send(true); while let Some(h) = myself.workflow_half.join_handles.next().await { h??; } - myself.worker.shutdown().await; + myself.common.worker.shutdown().await; myself.workflow_half.workflows.clear(); Ok(()) } + /// Set a [WorkerInterceptor] + pub fn set_worker_interceptor(&mut self, interceptor: Box) { + self.common.worker_interceptor = Some(interceptor); + } + /// Turns this rust worker into a new worker with all the same workflows and activities /// registered, but with a new underlying core worker. Can be used to swap the worker for /// a replay worker, change task queues, etc. pub fn with_new_core_worker(&mut self, new_core_worker: Arc) { - self.worker = new_core_worker; + self.common.worker = new_core_worker; } - fn split_apart( - &mut self, - ) -> ( - Arc, - &str, - &mut WorkflowHalf, - &mut ActivityHalf, - ) { + fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) { ( - self.worker.clone(), - &self.task_queue, + &mut self.common, &mut self.workflow_half, &mut self.activity_half, ) @@ -275,8 +290,7 @@ impl Worker { impl WorkflowHalf { async fn workflow_activation_handler( &mut self, - worker: &dyn CoreWorker, - task_queue: &str, + common: &CommonWorker, shutdown_rx: &Receiver, completions_tx: &UnboundedSender, completions_rx: &mut UnboundedReceiver, @@ -295,8 +309,8 @@ impl WorkflowHalf { .ok_or_else(|| anyhow!("Workflow type {workflow_type} not found"))?; let (wff, activations) = wf_function.start_workflow( - worker.get_config().namespace.clone(), - task_queue.to_string(), + common.worker.get_config().namespace.clone(), + common.task_queue.clone(), // NOTE: Don't clone args if this gets ported to be a non-test rust worker sw.arguments.clone(), completions_tx.clone(), @@ -323,10 +337,13 @@ impl WorkflowHalf { }; let completion = completions_rx.recv().await.expect("No workflows left?"); - if completion.has_execution_ending() { - debug!("Workflow {} says it's finishing", &completion.run_id); + if let Some(ref i) = common.worker_interceptor { + i.on_workflow_activation_completion(&completion); } - worker.complete_workflow_activation(completion).await?; + common + .worker + .complete_workflow_activation(completion) + .await?; Ok(()) } } diff --git a/test-utils/Cargo.toml b/test-utils/Cargo.toml index db11f09fa..41b037f37 100644 --- a/test-utils/Cargo.toml +++ b/test-utils/Cargo.toml @@ -24,6 +24,7 @@ temporal-sdk-core = { path = "../core" } temporal-sdk-core-api = { path = "../core-api" } thiserror = "1.0" tokio = "1.1" +tokio-util = { version = "0.7" } tracing = "0.1" url = "2.2" uuid = "0.8" diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 2e8ca57b8..d51a9bde2 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -1,18 +1,23 @@ //! This crate contains testing functionality that can be useful when building SDKs against Core, //! or even when testing workflows written in SDKs that use Core. +#[macro_use] +extern crate tracing; + pub mod canned_histories; use futures::{stream::FuturesUnordered, StreamExt}; use log::LevelFilter; use prost::Message; use rand::{distributions::Standard, Rng}; +use std::cell::RefCell; use std::sync::atomic::{AtomicUsize, Ordering}; use std::{ convert::TryFrom, env, future::Future, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration, }; use temporal_client::{RetryGateway, ServerGateway, ServerGatewayApis, WorkflowOptions}; +use temporal_sdk::interceptors::WorkerInterceptor; use temporal_sdk::{IntoActivityFunc, Worker, WorkflowFunction}; use temporal_sdk_core::{ init_replay_worker, init_worker, replay::mock_gateway_from_history, telemetry_init, @@ -244,14 +249,21 @@ impl CoreWfStarter { /// Provides conveniences for running integ tests with the SDK pub struct TestWorker { inner: Worker, - expected_wf_runs: AtomicUsize, + incomplete_workflows: Arc, } impl TestWorker { /// Create a new test worker pub fn new(core_worker: Arc, task_queue: impl Into) -> Self { + let ct = Arc::new(AtomicUsize::new(0)); + let mut inner = Worker::new(core_worker, task_queue); + let iceptor = WorkflowCompletionCountingInterceptor { + incomplete_workflows: ct.clone(), + shutdown_handle: Box::new(inner.shutdown_handle()), + }; + inner.set_worker_interceptor(Box::new(iceptor)); Self { - inner: Worker::new(core_worker, task_queue), - expected_wf_runs: AtomicUsize::new(0), + inner, + incomplete_workflows: ct, } } @@ -260,7 +272,8 @@ impl TestWorker { } pub fn incr_expected_run_count(&self, amount: usize) { - self.expected_wf_runs.fetch_add(amount, Ordering::AcqRel); + self.incomplete_workflows + .fetch_add(amount, Ordering::AcqRel); } // TODO: Maybe trait-ify? @@ -305,21 +318,52 @@ impl TestWorker { options, ) .await?; + self.incr_expected_run_count(1); Ok(res.run_id) } /// Runs until all expected workflows have completed pub async fn run_until_done(&mut self) -> Result<(), anyhow::Error> { - todo!() + self.inner.run().await } /// See [Self::run_until_done], except calls the provided callback just before performing core /// shutdown. pub async fn run_until_done_shutdown_hook( &mut self, - before_shutdown: impl FnOnce(), + before_shutdown: impl FnOnce() + 'static, ) -> Result<(), anyhow::Error> { - todo!() + // Replace shutdown interceptor with one that calls the before hook first + let b4shut = RefCell::new(Some(before_shutdown)); + let shutdown_handle = self.inner.shutdown_handle(); + let iceptor = WorkflowCompletionCountingInterceptor { + incomplete_workflows: self.incomplete_workflows.clone(), + shutdown_handle: Box::new(move || { + if let Some(s) = b4shut.borrow_mut().take() { + s(); + } + shutdown_handle(); + }), + }; + self.inner.set_worker_interceptor(Box::new(iceptor)); + self.inner.run().await + } +} + +struct WorkflowCompletionCountingInterceptor { + incomplete_workflows: Arc, + shutdown_handle: Box, +} +impl WorkerInterceptor for WorkflowCompletionCountingInterceptor { + fn on_workflow_activation_completion(&self, completion: &WorkflowActivationCompletion) { + if completion.has_execution_ending() { + info!("Workflow {} says it's finishing", &completion.run_id); + let prev = self.incomplete_workflows.fetch_sub(1, Ordering::SeqCst); + if prev <= 1 { + // There are now zero, we just subtracted one + (self.shutdown_handle)() + } + } } } diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index 3225cb85b..c1fb73556 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -269,7 +269,7 @@ async fn cancel_immediate(#[case] cancel_type: ActivityCancellationType) { .await .unwrap(); worker - .run_until_done_shutdown_hook(|| manual_cancel.cancel()) + .run_until_done_shutdown_hook(move || manual_cancel.cancel()) .await .unwrap(); } @@ -363,7 +363,7 @@ async fn cancel_after_act_starts( .await .unwrap(); worker - .run_until_done_shutdown_hook(|| manual_cancel.cancel()) + .run_until_done_shutdown_hook(move || manual_cancel.cancel()) .await .unwrap(); }