From 8adbeb91fcb54ded71143d923f45433c7ffbc01f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 21 Mar 2025 16:18:08 -0700 Subject: [PATCH] Defensive programming / add todo --- .../src/worker/activities/local_activities.rs | 6 +++++- .../workflow/machines/workflow_machines.rs | 19 ++++++++++++++++--- core/src/worker/workflow/workflow_stream.rs | 3 +++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/core/src/worker/activities/local_activities.rs b/core/src/worker/activities/local_activities.rs index a2a38c89e..e9dcef4b4 100644 --- a/core/src/worker/activities/local_activities.rs +++ b/core/src/worker/activities/local_activities.rs @@ -355,7 +355,6 @@ impl LocalActivityManager { } } LocalActRequest::CancelAllInRun(run_id) => { - debug!(run_id=%run_id, "Cancelling all local activities for run"); let mut dlock = self.dat.lock(); // Even if we've got 100k+ LAs this should only take a ms or two. Not worth // adding another map to keep in sync. @@ -363,7 +362,12 @@ impl LocalActivityManager { .la_info .iter_mut() .filter(|(id, _)| id.run_id == run_id); + let mut printed = false; for (laid, lainf) in las_for_run { + if !printed { + debug!(run_id=%run_id, "Cancelling all local activities for run"); + printed = true; + } if let Some(immediate_res) = self.cancel_one_la(laid.seq_num, lainf) { immediate_resolutions.push(immediate_res); } diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index a06132cd5..631eb3384 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -1072,7 +1072,7 @@ impl WorkflowMachines { ur, self.replaying, ); - let mk = self.add_new_protocol_machine(um.machine, message.protocol_instance_id); + let mk = self.add_new_protocol_machine(um.machine, message.protocol_instance_id)?; self.process_machine_responses(mk, vec![um.response])?; } } @@ -1601,10 +1601,23 @@ impl WorkflowMachines { } } - fn add_new_protocol_machine(&mut self, machine: Machines, instance_id: String) -> MachineKey { + fn add_new_protocol_machine( + &mut self, + machine: Machines, + instance_id: String, + ) -> Result { + if self + .machines_by_protocol_instance_id + .contains_key(&instance_id) + { + return Err(WFMachinesError::Fatal(format!( + "Machine with protocol instance id {} already exists! This is a bug.", + instance_id + ))); + } let k = self.all_machines.insert(machine); self.machines_by_protocol_instance_id.insert(instance_id, k); - k + Ok(k) } fn augment_continue_as_new_with_current_values( diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index 4af7b1cd0..e39b4ac93 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -343,6 +343,9 @@ impl WFStream { !report.is_autocomplete || matches!(act, OutstandingActivation::Autocomplete) }) }) { + // TODO: This and related code wasn't cleaned up since the changes to make evictions + // always be in their own activation. We should throw errors if any completion of + // an eviction is nonempty. if should_evict { debug!(run_id=%run_id, "Evicting run"); self.runs.remove(run_id);