Skip to content

Commit 12bd832

Browse files
authored
Fix resets of workflows causing a panic (#287)
Problem was the internal run id was always being set to the original run id, so when it changed because of a reset, lookup of the workflow state failed
1 parent d128633 commit 12bd832

File tree

7 files changed

+130
-23
lines changed

7 files changed

+130
-23
lines changed

core/src/core_tests/activity_tasks.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ use crate::{
88
ActivityHeartbeat, MetricsContext, Worker, WorkerConfigBuilder,
99
};
1010
use futures::FutureExt;
11-
use std::sync::Arc;
1211
use std::{
1312
cell::RefCell,
1413
collections::{hash_map::Entry, HashMap, VecDeque},
1514
rc::Rc,
16-
sync::atomic::{AtomicUsize, Ordering},
15+
sync::{
16+
atomic::{AtomicUsize, Ordering},
17+
Arc,
18+
},
1719
time::Duration,
1820
};
1921
use temporal_client::mocks::{mock_gateway, mock_manual_gateway};

core/src/workflow/machines/workflow_machines.rs

-1
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,6 @@ impl WorkflowMachines {
467467
attrs,
468468
)) = event.attributes
469469
{
470-
self.run_id = attrs.original_execution_run_id.clone();
471470
if let Some(st) = event.event_time {
472471
let as_systime: SystemTime = st.try_into()?;
473472
self.workflow_start_time = Some(as_systime);

core/src/workflow/workflow_tasks/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ impl WorkflowTaskManager {
309309
task_token = %&work.task_token,
310310
history_length = %work.history.events.len(),
311311
attempt = %work.attempt,
312+
run_id = %work.workflow_execution.run_id,
312313
"Applying new workflow task from server"
313314
);
314315
let task_start_time = Instant::now();

sdk-core-protos/src/lib.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,10 @@ pub mod coresdk {
260260
coresdk::{AsJsonPayloadExt, IntoPayloadsExt},
261261
temporal::api::common::v1::{Payload as ApiPayload, Payloads},
262262
};
263-
use std::collections::HashMap;
264-
use std::fmt::{Display, Formatter};
263+
use std::{
264+
collections::HashMap,
265+
fmt::{Display, Formatter},
266+
};
265267

266268
impl<T> From<T> for Payload
267269
where

test-utils/src/lib.rs

+36-18
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::{
1111
convert::TryFrom, env, future::Future, net::SocketAddr, path::PathBuf, sync::Arc,
1212
time::Duration,
1313
};
14-
use temporal_client::WorkflowOptions;
14+
use temporal_client::{RetryGateway, ServerGateway, ServerGatewayApis, WorkflowOptions};
1515
use temporal_sdk::TestRustWorker;
1616
use temporal_sdk_core::{
1717
init_replay_worker, init_worker, replay::mock_gateway_from_history, telemetry_init,
@@ -29,6 +29,7 @@ use temporal_sdk_core_protos::{
2929
},
3030
temporal::api::history::v1::History,
3131
};
32+
use tokio::sync::OnceCell;
3233
use url::Url;
3334

3435
pub const NAMESPACE: &str = "default";
@@ -77,7 +78,11 @@ pub struct CoreWfStarter {
7778
telemetry_options: TelemetryOptions,
7879
worker_config: WorkerConfig,
7980
wft_timeout: Option<Duration>,
80-
initted_worker: Option<Arc<dyn Worker>>,
81+
initted_worker: OnceCell<InitializedWorker>,
82+
}
83+
struct InitializedWorker {
84+
worker: Arc<dyn Worker>,
85+
client: Arc<RetryGateway<ServerGateway>>,
8186
}
8287

8388
impl CoreWfStarter {
@@ -99,7 +104,7 @@ impl CoreWfStarter {
99104
.build()
100105
.unwrap(),
101106
wft_timeout: None,
102-
initted_worker: None,
107+
initted_worker: OnceCell::new(),
103108
}
104109
}
105110

@@ -116,16 +121,11 @@ impl CoreWfStarter {
116121
}
117122

118123
pub async fn get_worker(&mut self) -> Arc<dyn Worker> {
119-
if self.initted_worker.is_none() {
120-
telemetry_init(&self.telemetry_options).expect("Telemetry inits cleanly");
121-
let gateway = get_integ_server_options()
122-
.connect(self.worker_config.namespace.clone(), None)
123-
.await
124-
.expect("Must connect");
125-
let worker = init_worker(self.worker_config.clone(), gateway);
126-
self.initted_worker = Some(Arc::new(worker));
127-
}
128-
self.initted_worker.as_ref().unwrap().clone()
124+
self.get_or_init().await.worker.clone()
125+
}
126+
127+
pub async fn get_client(&mut self) -> Arc<RetryGateway<ServerGateway>> {
128+
self.get_or_init().await.client.clone()
129129
}
130130

131131
/// Start the workflow defined by the builder and return run id
@@ -137,13 +137,12 @@ impl CoreWfStarter {
137137
pub async fn start_wf_with_id(&self, workflow_id: String, mut opts: WorkflowOptions) -> String {
138138
opts.task_timeout = opts.task_timeout.or(self.wft_timeout);
139139
self.initted_worker
140-
.as_ref()
140+
.get()
141141
.expect(
142-
"Core must be initted before starting a workflow.\
143-
Tests must call `get_core` first.",
142+
"Worker must be initted before starting a workflow.\
143+
Tests must call `get_worker` first.",
144144
)
145-
.as_ref()
146-
.server_gateway()
145+
.client
147146
.start_workflow(
148147
vec![],
149148
self.worker_config.task_queue.clone(),
@@ -218,6 +217,25 @@ impl CoreWfStarter {
218217
self.wft_timeout = Some(timeout);
219218
self
220219
}
220+
221+
async fn get_or_init(&mut self) -> &InitializedWorker {
222+
self.initted_worker
223+
.get_or_init(|| async {
224+
telemetry_init(&self.telemetry_options).expect("Telemetry inits cleanly");
225+
let gateway = Arc::new(
226+
get_integ_server_options()
227+
.connect(self.worker_config.namespace.clone(), None)
228+
.await
229+
.expect("Must connect"),
230+
);
231+
let worker = init_worker(self.worker_config.clone(), gateway.clone());
232+
InitializedWorker {
233+
worker: Arc::new(worker),
234+
client: gateway,
235+
}
236+
})
237+
.await
238+
}
221239
}
222240

223241
pub fn get_integ_server_options() -> ServerGatewayOptions {

tests/integ_tests/workflow_tests.rs

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ mod determinism;
77
mod local_activities;
88
mod patches;
99
mod replay;
10+
mod resets;
1011
mod signals;
1112
mod stickyness;
1213
mod timers;
+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
use futures::StreamExt;
2+
use std::{sync::Arc, time::Duration};
3+
use temporal_client::{ServerGatewayApis, WorkflowOptions, WorkflowService};
4+
use temporal_sdk::WfContext;
5+
use temporal_sdk_core_protos::temporal::api::{
6+
common::v1::WorkflowExecution, workflowservice::v1::ResetWorkflowExecutionRequest,
7+
};
8+
use temporal_sdk_core_test_utils::{CoreWfStarter, NAMESPACE};
9+
use tokio::sync::Notify;
10+
11+
const POST_RESET_SIG: &str = "post-reset";
12+
13+
#[tokio::test]
14+
async fn reset_workflow() {
15+
let wf_name = "reset_me_wf";
16+
let mut starter = CoreWfStarter::new(wf_name);
17+
let mut worker = starter.worker().await;
18+
let notify = Arc::new(Notify::new());
19+
20+
let wf_notify = notify.clone();
21+
worker.register_wf(wf_name.to_owned(), move |ctx: WfContext| {
22+
let notify = wf_notify.clone();
23+
async move {
24+
// Make a couple workflow tasks
25+
ctx.timer(Duration::from_secs(1)).await;
26+
ctx.timer(Duration::from_secs(1)).await;
27+
// Tell outer scope to send the reset
28+
notify.notify_one();
29+
let _ = ctx
30+
.make_signal_channel(POST_RESET_SIG)
31+
.next()
32+
.await
33+
.unwrap();
34+
Ok(().into())
35+
}
36+
});
37+
38+
let run_id = worker
39+
.submit_wf(
40+
wf_name.to_owned(),
41+
wf_name.to_owned(),
42+
vec![],
43+
WorkflowOptions::default(),
44+
)
45+
.await
46+
.unwrap();
47+
48+
let client = starter.get_client().await;
49+
let resetter_fut = async {
50+
notify.notified().await;
51+
// Do the reset
52+
client
53+
.get_client()
54+
.raw_retry_client()
55+
.reset_workflow_execution(ResetWorkflowExecutionRequest {
56+
namespace: NAMESPACE.to_owned(),
57+
workflow_execution: Some(WorkflowExecution {
58+
workflow_id: wf_name.to_owned(),
59+
run_id: run_id.clone(),
60+
}),
61+
// End of first WFT
62+
workflow_task_finish_event_id: 4,
63+
request_id: "test-req-id".to_owned(),
64+
..Default::default()
65+
})
66+
.await
67+
.unwrap();
68+
69+
// Unblock the workflow by sending the signal. Run ID will have changed after reset so
70+
// we use empty run id
71+
client
72+
.signal_workflow_execution(
73+
wf_name.to_owned(),
74+
"".to_owned(),
75+
POST_RESET_SIG.to_owned(),
76+
None,
77+
)
78+
.await
79+
.unwrap();
80+
};
81+
let run_fut = worker.run_until_done();
82+
let (_, rr) = tokio::join!(resetter_fut, run_fut);
83+
rr.unwrap();
84+
}

0 commit comments

Comments
 (0)