Skip to content

Commit df347ac

Browse files
committed
Unit and integ tests working
1 parent f67db6b commit df347ac

File tree

8 files changed

+132
-54
lines changed

8 files changed

+132
-54
lines changed

core-api/src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,9 @@ pub trait Worker: Send + Sync {
9797
/// Return this worker's config
9898
fn get_config(&self) -> &WorkerConfig;
9999

100+
/// TODO: Will be replaced/fixed/whatever by shutdown refactoring
101+
fn initiate_shutdown(&self);
102+
100103
/// Initiates async shutdown procedure, eventually ceases all polling of the server and shuts
101104
/// down this worker. [Worker::poll_workflow_activation] should be called until it
102105
/// returns [PollWfError::ShutDown] to ensure that any workflows which are still undergoing

core/src/telemetry/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ pub fn fetch_global_buffered_logs() -> Vec<CoreLog> {
253253
pub(crate) fn test_telem_console() {
254254
telemetry_init(&TelemetryOptions {
255255
otel_collector_url: None,
256-
tracing_filter: "temporal_sdk_core=DEBUG".to_string(),
256+
tracing_filter: "temporal_sdk_core=DEBUG,temporal_sdk=DEBUG".to_string(),
257257
log_forwarding_level: LevelFilter::Off,
258258
prometheus_export_bind_address: None,
259259
totally_disable: false,

core/src/worker/mod.rs

+12-10
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,17 @@ impl WorkerTrait for Worker {
162162
&self.config
163163
}
164164

165+
/// Begins the shutdown process, tells pollers they should stop. Is idempotent.
166+
// TODO: will be in trait after Roey's shutdown refactor
167+
fn initiate_shutdown(&self) {
168+
self.shutdown_token.cancel();
169+
// First, we want to stop polling of both activity and workflow tasks
170+
if let Some(atm) = self.at_task_mgr.as_ref() {
171+
atm.notify_shutdown();
172+
}
173+
self.wf_task_source.stop_pollers();
174+
}
175+
165176
async fn shutdown(&self) {
166177
self.shutdown().await
167178
}
@@ -291,20 +302,11 @@ impl Worker {
291302
}
292303
}
293304

294-
/// Begins the shutdown process, tells pollers they should stop. Is idempotent.
295-
pub(crate) fn initiate_shutdown(&self) {
296-
self.shutdown_token.cancel();
297-
// First, we want to stop polling of both activity and workflow tasks
298-
if let Some(atm) = self.at_task_mgr.as_ref() {
299-
atm.notify_shutdown();
300-
}
301-
self.wf_task_source.stop_pollers();
302-
}
303-
304305
/// Will shutdown the worker. Does not resolve until all outstanding workflow tasks have been
305306
/// completed
306307
pub(crate) async fn shutdown(&self) {
307308
self.initiate_shutdown();
309+
info!("Initiated shutdown");
308310
// Next we need to wait for all local activities to finish so no more workflow task
309311
// heartbeats will be generated
310312
self.local_act_mgr.shutdown_and_wait_all_finished().await;

sdk/src/interceptors.rs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
//! User-definable interceptors are defined in this module
2+
3+
use temporal_sdk_core_protos::coresdk::workflow_completion::WorkflowActivationCompletion;
4+
5+
/// Implementors can intercept certain actions that happen within the Worker.
6+
///
7+
/// Advanced usage only.
8+
pub trait WorkerInterceptor {
9+
/// Called every time a workflow activation completes
10+
fn on_workflow_activation_completion(&self, completion: &WorkflowActivationCompletion);
11+
}

sdk/src/lib.rs

+51-34
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,12 @@
2121
//! let worker_config = WorkerConfigBuilder::default().build()?;
2222
//! let core_worker = init_worker(worker_config, client);
2323
//!
24-
//! let mut worker = Worker::new(Arc::new(core_worker), "task_queue", None);
24+
//! let mut worker = Worker::new(Arc::new(core_worker), "task_queue");
2525
//! worker.register_activity(
2626
//! "echo_activity",
2727
//! |echo_me: String| async move { Ok(echo_me) },
2828
//! );
29-
//! // TODO: This should be different
30-
//! worker.run_until_done().await?;
29+
//! worker.run().await?;
3130
//! Ok(())
3231
//! }
3332
//! ```
@@ -36,6 +35,7 @@
3635
extern crate tracing;
3736

3837
mod conversions;
38+
pub mod interceptors;
3939
mod payload_converter;
4040
mod workflow_context;
4141
mod workflow_future;
@@ -45,6 +45,7 @@ pub use workflow_context::{
4545
Signal, SignalData, SignalWorkflowOptions, WfContext,
4646
};
4747

48+
use crate::interceptors::WorkerInterceptor;
4849
use crate::workflow_context::{ChildWfCommon, PendingChildWorkflow};
4950
use anyhow::{anyhow, bail};
5051
use futures::{future::BoxFuture, stream::FuturesUnordered, FutureExt, StreamExt};
@@ -106,12 +107,17 @@ pub fn sdk_client_options(url: impl Into<Url>) -> ServerGatewayOptionsBuilder {
106107
/// A worker that can poll for and respond to workflow tasks by using [WorkflowFunction]s,
107108
/// and activity tasks by using [ActivityFunction]s
108109
pub struct Worker {
109-
worker: Arc<dyn CoreWorker>,
110-
task_queue: String,
110+
common: CommonWorker,
111111
workflow_half: WorkflowHalf,
112112
activity_half: ActivityHalf,
113113
}
114114

115+
struct CommonWorker {
116+
worker: Arc<dyn CoreWorker>,
117+
task_queue: String,
118+
worker_interceptor: Option<Box<dyn WorkerInterceptor>>,
119+
}
120+
115121
struct WorkflowHalf {
116122
/// Maps run id to the driver
117123
workflows: HashMap<String, UnboundedSender<WorkflowActivation>>,
@@ -132,8 +138,11 @@ impl Worker {
132138
/// Create a new rust worker
133139
pub fn new(worker: Arc<dyn CoreWorker>, task_queue: impl Into<String>) -> Self {
134140
Self {
135-
worker,
136-
task_queue: task_queue.into(),
141+
common: CommonWorker {
142+
worker,
143+
task_queue: task_queue.into(),
144+
worker_interceptor: None,
145+
},
137146
workflow_half: WorkflowHalf {
138147
workflows: Default::default(),
139148
workflow_fns: Default::default(),
@@ -148,12 +157,19 @@ impl Worker {
148157

149158
/// Access the worker's server gateway client
150159
pub fn server_gateway(&self) -> Arc<dyn ServerGatewayApis + Send + Sync> {
151-
self.worker.server_gateway()
160+
self.common.worker.server_gateway()
152161
}
153162

154163
/// Returns the task queue name this worker polls on
155164
pub fn task_queue(&self) -> &str {
156-
&self.task_queue
165+
&self.common.task_queue
166+
}
167+
168+
/// Return a handle that can be used to initiate shutdown.
169+
/// TODO: Doc better after shutdown changes
170+
pub fn shutdown_handle(&self) -> impl Fn() {
171+
let w = self.common.worker.clone();
172+
move || w.initiate_shutdown()
157173
}
158174

159175
/// Register a Workflow function to invoke when the Worker is asked to run a workflow of
@@ -188,22 +204,22 @@ impl Worker {
188204
pub async fn run(&mut self) -> Result<(), anyhow::Error> {
189205
let (shutdown_tx, shutdown_rx) = watch::channel(false);
190206
let pollers = async move {
191-
let (worker, task_q, wf_half, act_half) = self.split_apart();
207+
let (common, wf_half, act_half) = self.split_apart();
192208
let (completions_tx, mut completions_rx) = unbounded_channel();
193209
let (wf_poll_res, act_poll_res) = tokio::join!(
194210
// Workflow polling loop
195211
async {
196212
loop {
197-
let activation = match worker.poll_workflow_activation().await {
213+
info!("Polling");
214+
let activation = match common.worker.poll_workflow_activation().await {
198215
Err(PollWfError::ShutDown) => {
199216
break Result::<_, anyhow::Error>::Ok(());
200217
}
201218
o => o?,
202219
};
203220
wf_half
204221
.workflow_activation_handler(
205-
worker.as_ref(),
206-
task_q,
222+
common,
207223
&shutdown_rx,
208224
&completions_tx,
209225
&mut completions_rx,
@@ -219,11 +235,11 @@ impl Worker {
219235
if !act_half.activity_fns.is_empty() {
220236
loop {
221237
tokio::select! {
222-
activity = worker.poll_activity_task() => {
238+
activity = common.worker.poll_activity_task() => {
223239
if matches!(activity, Err(PollActivityError::ShutDown)) {
224240
break;
225241
}
226-
act_half.activity_task_handler(worker.clone(),
242+
act_half.activity_task_handler(common.worker.clone(),
227243
activity?)?;
228244
},
229245
_ = shutdown_rx.changed() => { break }
@@ -240,32 +256,31 @@ impl Worker {
240256
};
241257

242258
let myself = pollers.await?;
259+
info!("Polling loop exited");
260+
let _ = shutdown_tx.send(true);
243261
while let Some(h) = myself.workflow_half.join_handles.next().await {
244262
h??;
245263
}
246-
myself.worker.shutdown().await;
264+
myself.common.worker.shutdown().await;
247265
myself.workflow_half.workflows.clear();
248266
Ok(())
249267
}
250268

269+
/// Set a [WorkerInterceptor]
270+
pub fn set_worker_interceptor(&mut self, interceptor: Box<dyn WorkerInterceptor>) {
271+
self.common.worker_interceptor = Some(interceptor);
272+
}
273+
251274
/// Turns this rust worker into a new worker with all the same workflows and activities
252275
/// registered, but with a new underlying core worker. Can be used to swap the worker for
253276
/// a replay worker, change task queues, etc.
254277
pub fn with_new_core_worker(&mut self, new_core_worker: Arc<dyn CoreWorker>) {
255-
self.worker = new_core_worker;
278+
self.common.worker = new_core_worker;
256279
}
257280

258-
fn split_apart(
259-
&mut self,
260-
) -> (
261-
Arc<dyn CoreWorker>,
262-
&str,
263-
&mut WorkflowHalf,
264-
&mut ActivityHalf,
265-
) {
281+
fn split_apart(&mut self) -> (&mut CommonWorker, &mut WorkflowHalf, &mut ActivityHalf) {
266282
(
267-
self.worker.clone(),
268-
&self.task_queue,
283+
&mut self.common,
269284
&mut self.workflow_half,
270285
&mut self.activity_half,
271286
)
@@ -275,8 +290,7 @@ impl Worker {
275290
impl WorkflowHalf {
276291
async fn workflow_activation_handler(
277292
&mut self,
278-
worker: &dyn CoreWorker,
279-
task_queue: &str,
293+
common: &CommonWorker,
280294
shutdown_rx: &Receiver<bool>,
281295
completions_tx: &UnboundedSender<WorkflowActivationCompletion>,
282296
completions_rx: &mut UnboundedReceiver<WorkflowActivationCompletion>,
@@ -295,8 +309,8 @@ impl WorkflowHalf {
295309
.ok_or_else(|| anyhow!("Workflow type {workflow_type} not found"))?;
296310

297311
let (wff, activations) = wf_function.start_workflow(
298-
worker.get_config().namespace.clone(),
299-
task_queue.to_string(),
312+
common.worker.get_config().namespace.clone(),
313+
common.task_queue.clone(),
300314
// NOTE: Don't clone args if this gets ported to be a non-test rust worker
301315
sw.arguments.clone(),
302316
completions_tx.clone(),
@@ -323,10 +337,13 @@ impl WorkflowHalf {
323337
};
324338

325339
let completion = completions_rx.recv().await.expect("No workflows left?");
326-
if completion.has_execution_ending() {
327-
debug!("Workflow {} says it's finishing", &completion.run_id);
340+
if let Some(ref i) = common.worker_interceptor {
341+
i.on_workflow_activation_completion(&completion);
328342
}
329-
worker.complete_workflow_activation(completion).await?;
343+
common
344+
.worker
345+
.complete_workflow_activation(completion)
346+
.await?;
330347
Ok(())
331348
}
332349
}

test-utils/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ temporal-sdk-core = { path = "../core" }
2424
temporal-sdk-core-api = { path = "../core-api" }
2525
thiserror = "1.0"
2626
tokio = "1.1"
27+
tokio-util = { version = "0.7" }
2728
tracing = "0.1"
2829
url = "2.2"
2930
uuid = "0.8"

0 commit comments

Comments
 (0)