diff --git a/influxdb3_processing_engine/src/lib.rs b/influxdb3_processing_engine/src/lib.rs index 5a865cca5c7..1578d279a59 100644 --- a/influxdb3_processing_engine/src/lib.rs +++ b/influxdb3_processing_engine/src/lib.rs @@ -45,16 +45,32 @@ struct PluginChannels { const PLUGIN_EVENT_BUFFER_SIZE: usize = 60; impl PluginChannels { - async fn shutdown(&mut self, db: String, trigger: String) { - // create a one shot to wait for the shutdown to complete - let (tx, rx) = oneshot::channel(); - if let Some(trigger_map) = self.active_triggers.get_mut(&db) { - if let Some(sender) = trigger_map.remove(&trigger) { - if sender.send(PluginEvent::Shutdown(tx)).await.is_ok() { - rx.await.ok(); + // returns Ok(Some(receiver)) if there was a sender to the named trigger. + async fn send_shutdown( + &self, + db: String, + trigger: String, + ) -> Result>, ProcessingEngineError> { + if let Some(trigger_map) = self.active_triggers.get(&db) { + if let Some(sender) = trigger_map.get(&trigger) { + // create a one shot to wait for the shutdown to complete + let (tx, rx) = oneshot::channel(); + if sender.send(PluginEvent::Shutdown(tx)).await.is_err() { + return Err(ProcessingEngineError::TriggerShutdownError { + database: db, + trigger_name: trigger, + }); } + return Ok(Some(rx)); } } + Ok(None) + } + + fn remove_trigger(&mut self, db: String, trigger: String) { + if let Some(trigger_map) = self.active_triggers.get_mut(&db) { + trigger_map.remove(&trigger); + } } #[cfg(feature = "system-py")] @@ -340,10 +356,26 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl { self.wal.write_ops(vec![wal_op]).await?; } - let mut plugin_channels = self.plugin_event_tx.lock().await; - plugin_channels - .shutdown(db_name.to_string(), trigger_name.to_string()) - .await; + let Some(shutdown_rx) = self + .plugin_event_tx + .lock() + .await + .send_shutdown(db_name.to_string(), trigger_name.to_string()) + .await? + else { + return Ok(()); + }; + + if shutdown_rx.await.is_err() { + warn!( + "shutdown trigger receiver dropped, may have received multiple shutdown requests" + ); + } else { + self.plugin_event_tx + .lock() + .await + .remove_trigger(db_name.to_string(), trigger_name.to_string()); + } Ok(()) } diff --git a/influxdb3_processing_engine/src/manager.rs b/influxdb3_processing_engine/src/manager.rs index 7218f0c24d6..04710d0ddf4 100644 --- a/influxdb3_processing_engine/src/manager.rs +++ b/influxdb3_processing_engine/src/manager.rs @@ -29,6 +29,12 @@ pub enum ProcessingEngineError { #[error("plugin error: {0}")] PluginError(#[from] crate::plugins::Error), + + #[error("failed to shutdown trigger {trigger_name} in database {database}")] + TriggerShutdownError { + database: String, + trigger_name: String, + }, } /// `[ProcessingEngineManager]` is used to interact with the processing engine,