Skip to content

Commit f67db6b

Browse files
committed
Compile fixes to lift out test worker -- tests still not passing
1 parent 99f3708 commit f67db6b

File tree

10 files changed

+117
-109
lines changed

10 files changed

+117
-109
lines changed

core/benches/workflow_replay.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,9 @@ pub fn criterion_benchmark(c: &mut Criterion) {
2929
tokio_runtime.block_on(async {
3030
let func = timers_wf(num_timers);
3131
let (worker, _) = init_core_replay_preloaded("replay_bench", &hist);
32-
let mut worker = Worker::new(worker, "replay_bench".to_string(), None);
32+
let mut worker = Worker::new(worker, "replay_bench".to_string());
3333
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
34-
worker.incr_expected_run_count(1);
35-
worker.run_until_done().await.unwrap();
34+
worker.run().await.unwrap();
3635
})
3736
})
3837
});
@@ -46,10 +45,9 @@ pub fn criterion_benchmark(c: &mut Criterion) {
4645
tokio_runtime.block_on(async {
4746
let func = big_signals_wf(num_tasks);
4847
let (worker, _) = init_core_replay_preloaded("large_hist_bench", &hist);
49-
let mut worker = Worker::new(worker, "large_hist_bench".to_string(), None);
48+
let mut worker = Worker::new(worker, "large_hist_bench".to_string());
5049
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
51-
worker.incr_expected_run_count(1);
52-
worker.run_until_done().await.unwrap();
50+
worker.run().await.unwrap();
5351
})
5452
})
5553
});

core/src/core_tests/child_workflows.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@ use crate::{
77
};
88
use std::sync::Arc;
99
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
10-
use temporal_sdk::{
11-
ChildWorkflowOptions, Signal, WfContext, Worker, WorkflowFunction, WorkflowResult,
12-
};
10+
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowFunction, WorkflowResult};
1311
use temporal_sdk_core_protos::coresdk::child_workflow::{
1412
child_workflow_result, ChildWorkflowCancellationType,
1513
};
14+
use temporal_sdk_core_test_utils::TestWorker;
1615
use tokio::join;
1716

1817
const SIGNAME: &str = "SIGNAME";
@@ -29,7 +28,7 @@ async fn signal_child_workflow(#[case] serial: bool) {
2928
let mh = MockPollCfg::from_resp_batches(wf_id, t, [ResponseType::AllHistory], mock);
3029
let mock = build_mock_pollers(mh);
3130
let core = mock_worker(mock);
32-
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
31+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
3332

3433
let wf = move |ctx: WfContext| async move {
3534
let child = ctx.child_workflow(ChildWorkflowOptions {

core/src/core_tests/determinism.rs

+4-3
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use std::{
1212
time::Duration,
1313
};
1414
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
15-
use temporal_sdk::{WfContext, Worker, WorkflowResult};
15+
use temporal_sdk::{WfContext, WorkflowResult};
1616
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause;
17+
use temporal_sdk_core_test_utils::TestWorker;
1718

1819
static DID_FAIL: AtomicBool = AtomicBool::new(false);
1920
pub async fn timer_wf_fails_once(ctx: WfContext) -> WorkflowResult<()> {
@@ -43,7 +44,7 @@ async fn test_wf_task_rejected_properly() {
4344
Box::new(|_, cause, _| matches!(cause, WorkflowTaskFailedCause::Unspecified));
4445
let mock = build_mock_pollers(mh);
4546
let core = mock_worker(mock);
46-
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
47+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
4748

4849
worker.register_wf(wf_type.to_owned(), timer_wf_fails_once);
4950
worker
@@ -87,7 +88,7 @@ async fn test_wf_task_rejected_properly_due_to_nondeterminism(#[case] use_cache:
8788
});
8889
}
8990
let core = mock_worker(mock);
90-
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
91+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
9192

9293
let started_count: &'static _ = Box::leak(Box::new(AtomicUsize::new(0)));
9394
worker.register_wf(wf_type.to_owned(), move |ctx: WfContext| async move {

core/src/core_tests/local_activities.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ use std::{
1212
time::Duration,
1313
};
1414
use temporal_client::{mocks::mock_gateway, WorkflowOptions};
15-
use temporal_sdk::{LocalActivityOptions, WfContext, Worker, WorkflowResult};
15+
use temporal_sdk::{LocalActivityOptions, WfContext, WorkflowResult};
1616
use temporal_sdk_core_protos::{
1717
coresdk::{common::RetryPolicy, AsJsonPayloadExt},
1818
temporal::api::{enums::v1::EventType, failure::v1::Failure},
1919
};
20+
use temporal_sdk_core_test_utils::TestWorker;
2021
use tokio::sync::Barrier;
2122

2223
async fn echo(e: String) -> anyhow::Result<String> {
@@ -55,7 +56,7 @@ async fn local_act_two_wfts_before_marker(#[case] replay: bool, #[case] cached:
5556
mock.worker_cfg(|cfg| cfg.max_cached_workflows = 1);
5657
}
5758
let core = mock_worker(mock);
58-
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
59+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
5960

6061
worker.register_wf(
6162
DEFAULT_WORKFLOW_TYPE.to_owned(),
@@ -119,7 +120,7 @@ async fn local_act_many_concurrent() {
119120
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1, 2, 3], mock);
120121
let mock = build_mock_pollers(mh);
121122
let core = mock_worker(mock);
122-
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
123+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
123124

124125
worker.register_wf(DEFAULT_WORKFLOW_TYPE.to_owned(), local_act_fanout_wf);
125126
worker.register_activity("echo", |str: String| async move { Ok(str) });
@@ -168,7 +169,7 @@ async fn local_act_heartbeat(#[case] shutdown_middle: bool) {
168169
let mut mock = build_mock_pollers(mh);
169170
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
170171
let core = Arc::new(mock_worker(mock));
171-
let mut worker = Worker::new(core.clone(), TEST_Q.to_string(), None);
172+
let mut worker = TestWorker::new(core.clone(), TEST_Q.to_string());
172173
let shutdown_barr: &'static Barrier = Box::leak(Box::new(Barrier::new(2)));
173174

174175
worker.register_wf(
@@ -226,7 +227,7 @@ async fn local_act_fail_and_retry(#[case] eventually_pass: bool) {
226227
let mh = MockPollCfg::from_resp_batches(wf_id, t, [1], mock);
227228
let mock = build_mock_pollers(mh);
228229
let core = mock_worker(mock);
229-
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
230+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
230231

231232
worker.register_wf(
232233
DEFAULT_WORKFLOW_TYPE.to_owned(),
@@ -310,7 +311,7 @@ async fn local_act_retry_long_backoff_uses_timer() {
310311
let mut mock = build_mock_pollers(mh);
311312
mock.worker_cfg(|w| w.max_cached_workflows = 1);
312313
let core = mock_worker(mock);
313-
let mut worker = Worker::new(Arc::new(core), TEST_Q.to_string(), None);
314+
let mut worker = TestWorker::new(Arc::new(core), TEST_Q.to_string());
314315

315316
worker.register_wf(
316317
DEFAULT_WORKFLOW_TYPE.to_owned(),

sdk/src/lib.rs

+6-80
Original file line numberDiff line numberDiff line change
@@ -53,13 +53,9 @@ use std::{
5353
collections::HashMap,
5454
fmt::{Debug, Display, Formatter},
5555
future::Future,
56-
sync::{
57-
atomic::{AtomicUsize, Ordering},
58-
Arc,
59-
},
60-
time::Duration,
56+
sync::Arc,
6157
};
62-
use temporal_client::{ServerGatewayApis, ServerGatewayOptionsBuilder, WorkflowOptions};
58+
use temporal_client::{ServerGatewayApis, ServerGatewayOptionsBuilder};
6359
use temporal_sdk_core::Url;
6460
use temporal_sdk_core_api::{
6561
errors::{PollActivityError, PollWfError},
@@ -112,7 +108,6 @@ pub fn sdk_client_options(url: impl Into<Url>) -> ServerGatewayOptionsBuilder {
112108
pub struct Worker {
113109
worker: Arc<dyn CoreWorker>,
114110
task_queue: String,
115-
task_timeout: Option<Duration>,
116111
workflow_half: WorkflowHalf,
117112
activity_half: ActivityHalf,
118113
}
@@ -122,8 +117,6 @@ struct WorkflowHalf {
122117
workflows: HashMap<String, UnboundedSender<WorkflowActivation>>,
123118
/// Maps workflow type to the function for executing workflow runs with that ID
124119
workflow_fns: HashMap<String, WorkflowFunction>,
125-
/// Number of live workflows
126-
incomplete_workflows: Arc<AtomicUsize>,
127120
/// Handles for each spawned workflow run are inserted here to be cleaned up when all runs
128121
/// are finished
129122
join_handles: FuturesUnordered<BoxFuture<'static, Result<WorkflowResult<()>, JoinError>>>,
@@ -137,19 +130,13 @@ struct ActivityHalf {
137130

138131
impl Worker {
139132
/// Create a new rust worker
140-
pub fn new(
141-
worker: Arc<dyn CoreWorker>,
142-
task_queue: impl Into<String>,
143-
task_timeout: Option<Duration>,
144-
) -> Self {
133+
pub fn new(worker: Arc<dyn CoreWorker>, task_queue: impl Into<String>) -> Self {
145134
Self {
146135
worker,
147136
task_queue: task_queue.into(),
148-
task_timeout,
149137
workflow_half: WorkflowHalf {
150138
workflows: Default::default(),
151139
workflow_fns: Default::default(),
152-
incomplete_workflows: Arc::new(AtomicUsize::new(0)),
153140
join_handles: FuturesUnordered::new(),
154141
},
155142
activity_half: ActivityHalf {
@@ -169,36 +156,6 @@ impl Worker {
169156
&self.task_queue
170157
}
171158

172-
/// Create a workflow, asking the server to start it with the provided workflow ID and using the
173-
/// provided workflow function.
174-
///
175-
/// Increments the expected Workflow run count.
176-
///
177-
/// Returns the run id of the started workflow
178-
pub async fn submit_wf(
179-
&self,
180-
workflow_id: impl Into<String>,
181-
workflow_type: impl Into<String>,
182-
input: Vec<Payload>,
183-
mut options: WorkflowOptions,
184-
) -> Result<String, tonic::Status> {
185-
options.task_timeout = options.task_timeout.or(self.task_timeout);
186-
let res = self
187-
.worker
188-
.server_gateway()
189-
.start_workflow(
190-
input,
191-
self.task_queue.clone(),
192-
workflow_id.into(),
193-
workflow_type.into(),
194-
options,
195-
)
196-
.await?;
197-
198-
self.incr_expected_run_count(1);
199-
Ok(res.run_id)
200-
}
201-
202159
/// Register a Workflow function to invoke when the Worker is asked to run a workflow of
203160
/// `workflow_type`
204161
pub fn register_wf<F: Into<WorkflowFunction>>(
@@ -226,25 +183,9 @@ impl Worker {
226183
);
227184
}
228185

229-
// TODO: Should be removed before making this worker prod ready. There can be a test worker
230-
// which wraps this one and implements the workflow counting / run_until_done concepts.
231-
// This worker can expose an interceptor for completions that could be used to assist with
232-
// workflow tracking
233-
/// Increment the expected Workflow run count on this Worker. The Worker tracks the run count
234-
/// and will resolve `run_until_done` when it goes down to 0.
235-
/// You do not have to increment if scheduled a Workflow with `submit_wf`.
236-
pub fn incr_expected_run_count(&self, count: usize) {
237-
self.workflow_half
238-
.incomplete_workflows
239-
.fetch_add(count, Ordering::SeqCst);
240-
}
241-
242-
/// See [Self::run_until_done], except calls the provided callback just before performing core
243-
/// shutdown.
244-
pub async fn run_until_done_shutdown_hook(
245-
&mut self,
246-
before_shutdown: impl FnOnce(),
247-
) -> Result<(), anyhow::Error> {
186+
/// Runs the worker. Eventually resolves after the worker has been explicitly shut down,
187+
/// or may return early with an error in the event of some unresolvable problem.
188+
pub async fn run(&mut self) -> Result<(), anyhow::Error> {
248189
let (shutdown_tx, shutdown_rx) = watch::channel(false);
249190
let pollers = async move {
250191
let (worker, task_q, wf_half, act_half) = self.split_apart();
@@ -269,13 +210,6 @@ impl Worker {
269210
activation,
270211
)
271212
.await?;
272-
if wf_half.incomplete_workflows.load(Ordering::SeqCst) == 0 {
273-
info!("All expected workflows complete");
274-
// Die rebel scum - evict all workflows (which are complete now),
275-
// and turn off activity polling.
276-
let _ = shutdown_tx.send(true);
277-
break Result::<_, anyhow::Error>::Ok(());
278-
}
279213
}
280214
},
281215
// Only poll on the activity queue if activity functions have been registered. This
@@ -309,18 +243,11 @@ impl Worker {
309243
while let Some(h) = myself.workflow_half.join_handles.next().await {
310244
h??;
311245
}
312-
before_shutdown();
313246
myself.worker.shutdown().await;
314247
myself.workflow_half.workflows.clear();
315248
Ok(())
316249
}
317250

318-
/// Drives all workflows & activities until they have all finished, repeatedly polls server to
319-
/// fetch work for them.
320-
pub async fn run_until_done(&mut self) -> Result<(), anyhow::Error> {
321-
self.run_until_done_shutdown_hook(|| {}).await
322-
}
323-
324251
/// Turns this rust worker into a new worker with all the same workflows and activities
325252
/// registered, but with a new underlying core worker. Can be used to swap the worker for
326253
/// a replay worker, change task queues, etc.
@@ -398,7 +325,6 @@ impl WorkflowHalf {
398325
let completion = completions_rx.recv().await.expect("No workflows left?");
399326
if completion.has_execution_ending() {
400327
debug!("Workflow {} says it's finishing", &completion.run_id);
401-
self.incomplete_workflows.fetch_sub(1, Ordering::SeqCst);
402328
}
403329
worker.complete_workflow_activation(completion).await?;
404330
Ok(())

test-utils/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ temporal-sdk-core = { path = "../core" }
2424
temporal-sdk-core-api = { path = "../core-api" }
2525
thiserror = "1.0"
2626
tokio = "1.1"
27+
tracing = "0.1"
2728
url = "2.2"
2829
uuid = "0.8"
2930

0 commit comments

Comments
 (0)