Skip to content

Commit

Permalink
fix: change trigger shutdown implementation to avoid deadlock. (#25806)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksonrnewhouse authored Jan 12, 2025
1 parent 491a37b commit 6e942b2
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 11 deletions.
54 changes: 43 additions & 11 deletions influxdb3_processing_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,16 +45,32 @@ struct PluginChannels {
const PLUGIN_EVENT_BUFFER_SIZE: usize = 60;

impl PluginChannels {
async fn shutdown(&mut self, db: String, trigger: String) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if let Some(trigger_map) = self.active_triggers.get_mut(&db) {
if let Some(sender) = trigger_map.remove(&trigger) {
if sender.send(PluginEvent::Shutdown(tx)).await.is_ok() {
rx.await.ok();
// returns Ok(Some(receiver)) if there was a sender to the named trigger.
async fn send_shutdown(
&self,
db: String,
trigger: String,
) -> Result<Option<Receiver<()>>, ProcessingEngineError> {
if let Some(trigger_map) = self.active_triggers.get(&db) {
if let Some(sender) = trigger_map.get(&trigger) {
// create a one shot to wait for the shutdown to complete
let (tx, rx) = oneshot::channel();
if sender.send(PluginEvent::Shutdown(tx)).await.is_err() {
return Err(ProcessingEngineError::TriggerShutdownError {
database: db,
trigger_name: trigger,
});
}
return Ok(Some(rx));
}
}
Ok(None)
}

fn remove_trigger(&mut self, db: String, trigger: String) {
if let Some(trigger_map) = self.active_triggers.get_mut(&db) {
trigger_map.remove(&trigger);
}
}

#[cfg(feature = "system-py")]
Expand Down Expand Up @@ -340,10 +356,26 @@ impl ProcessingEngineManager for ProcessingEngineManagerImpl {
self.wal.write_ops(vec![wal_op]).await?;
}

let mut plugin_channels = self.plugin_event_tx.lock().await;
plugin_channels
.shutdown(db_name.to_string(), trigger_name.to_string())
.await;
let Some(shutdown_rx) = self
.plugin_event_tx
.lock()
.await
.send_shutdown(db_name.to_string(), trigger_name.to_string())
.await?
else {
return Ok(());
};

if shutdown_rx.await.is_err() {
warn!(
"shutdown trigger receiver dropped, may have received multiple shutdown requests"
);
} else {
self.plugin_event_tx
.lock()
.await
.remove_trigger(db_name.to_string(), trigger_name.to_string());
}

Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions influxdb3_processing_engine/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ pub enum ProcessingEngineError {

#[error("plugin error: {0}")]
PluginError(#[from] crate::plugins::Error),

#[error("failed to shutdown trigger {trigger_name} in database {database}")]
TriggerShutdownError {
database: String,
trigger_name: String,
},
}

/// `[ProcessingEngineManager]` is used to interact with the processing engine,
Expand Down

0 comments on commit 6e942b2

Please sign in to comment.