diff --git a/diesel/src/pg/backend.rs b/diesel/src/pg/backend.rs index db18d681c8d0..8232a53dbe2f 100644 --- a/diesel/src/pg/backend.rs +++ b/diesel/src/pg/backend.rs @@ -157,3 +157,20 @@ impl sql_dialect::on_conflict_clause::PgLikeOnConflictClause for PgOnConflictCla pub struct PgStyleArrayComparison; impl LikeIsAllowedForType for Pg {} + +// Using the same field names as tokio-postgres +/// See Postgres documentation for SQL Commands NOTIFY and LISTEN +#[derive(Clone, Debug)] +pub struct PgNotification { + /// process ID of notifying server process + pub process_id: i32, + /// Name of the notification channel + pub channel: String, + /// optional data that was submitted with the notification, + /// + /// This is set to an empty string if no data was submitted + /// + /// (Postgres unfortunally does not provide a way to differentiate between + /// not set and empty here) + pub payload: String, +} diff --git a/diesel/src/pg/connection/mod.rs b/diesel/src/pg/connection/mod.rs index 680b6a16246d..000f569620c3 100644 --- a/diesel/src/pg/connection/mod.rs +++ b/diesel/src/pg/connection/mod.rs @@ -18,6 +18,7 @@ use crate::connection::instrumentation::{ use crate::connection::statement_cache::{MaybeCached, StatementCache}; use crate::connection::*; use crate::expression::QueryMetadata; +use crate::pg::backend::PgNotification; use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache}; use crate::pg::query_builder::copy::InternalCopyFromQuery; use crate::pg::{Pg, TransactionBuilder}; @@ -547,6 +548,51 @@ impl PgConnection { .set_notice_processor(noop_notice_processor); Ok(()) } + + /// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][] + /// + /// The returned iterator can yield items even after a None value when new notifications have been received. + /// The iterator can be polled again after a `None` value was received as new notifications might have + /// been send in the mean time. + /// + /// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html + /// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html + /// + /// ## Example + /// + /// ``` + /// # include!("../../doctest_setup.rs"); + /// # + /// # fn main() { + /// # run_test().unwrap(); + /// # } + /// # + /// # fn run_test() -> QueryResult<()> { + /// # let connection = &mut establish_connection(); + /// + /// // register the notifications channel we want to receive notifications for + /// diesel::sql_query("LISTEN example_channel").execute(connection)?; + /// // send some notification + /// // this is usually done from a different connection/thread/application + /// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(connection)?; + /// + /// for result in connection.notifications_iter() { + /// let notification = result.unwrap(); + /// assert_eq!(notification.channel, "example_channel"); + /// assert_eq!(notification.payload, "additional data"); + /// + /// println!( + /// "Notification received from server process with id {}.", + /// notification.process_id + /// ); + /// } + /// # Ok(()) + /// # } + /// ``` + pub fn notifications_iter(&mut self) -> impl Iterator> + '_ { + let conn = &self.connection_and_transaction_manager.raw_connection; + std::iter::from_fn(move || conn.pq_notifies().transpose()) + } } extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {} @@ -626,6 +672,47 @@ mod tests { crate::test_helpers::pg_connection_no_transaction() } + #[diesel_test_helper::test] + fn notifications_arrive() { + use crate::sql_query; + + let conn = &mut connection(); + sql_query("LISTEN test_notifications") + .execute(conn) + .unwrap(); + sql_query("NOTIFY test_notifications, 'first'") + .execute(conn) + .unwrap(); + sql_query("NOTIFY test_notifications, 'second'") + .execute(conn) + .unwrap(); + + let notifications = conn + .notifications_iter() + .map(Result::unwrap) + .collect::>(); + + assert_eq!(2, notifications.len()); + assert_eq!(notifications[0].channel, "test_notifications"); + assert_eq!(notifications[1].channel, "test_notifications"); + assert_eq!(notifications[0].payload, "first"); + assert_eq!(notifications[1].payload, "second"); + + let next_notification = conn.notifications_iter().next(); + assert!( + next_notification.is_none(), + "Got a next notification, while not expecting one: {next_notification:?}" + ); + + sql_query("NOTIFY test_notifications") + .execute(conn) + .unwrap(); + assert_eq!( + conn.notifications_iter().next().unwrap().unwrap().payload, + "" + ); + } + #[diesel_test_helper::test] fn malformed_sql_query() { let connection = &mut connection(); diff --git a/diesel/src/pg/connection/raw.rs b/diesel/src/pg/connection/raw.rs index b62791962204..820539bc868f 100644 --- a/diesel/src/pg/connection/raw.rs +++ b/diesel/src/pg/connection/raw.rs @@ -12,6 +12,7 @@ use std::{ptr, str}; use crate::result::*; use super::result::PgResult; +use crate::pg::PgNotification; #[allow(missing_debug_implementations, missing_copy_implementations)] pub(super) struct RawConnection { @@ -183,6 +184,77 @@ impl RawConnection { )) } } + + pub(super) fn pq_notifies(&self) -> Result, Error> { + let conn = self.internal_connection; + let ret = unsafe { PQconsumeInput(conn.as_ptr()) }; + if ret == 0 { + return Err(Error::DatabaseError( + DatabaseErrorKind::Unknown, + Box::new(self.last_error_message()), + )); + } + + let pgnotify = unsafe { PQnotifies(conn.as_ptr()) }; + if pgnotify.is_null() { + Ok(None) + } else { + // we use a drop guard here to + // make sure that we always free + // the provided pointer, even if we + // somehow return an error below + struct Guard<'a> { + value: &'a mut pgNotify, + } + + impl Drop for Guard<'_> { + fn drop(&mut self) { + unsafe { + // SAFETY: We know that this value is not null here + PQfreemem(self.value as *mut pgNotify as *mut std::ffi::c_void) + }; + } + } + + let pgnotify = unsafe { + // SAFETY: We checked for null values above + Guard { + value: &mut *pgnotify, + } + }; + if pgnotify.value.relname.is_null() { + return Err(Error::DeserializationError( + "Received an unexpected null value for `relname` from the notification".into(), + )); + } + if pgnotify.value.extra.is_null() { + return Err(Error::DeserializationError( + "Received an unexpected null value for `extra` from the notification".into(), + )); + } + + let channel = unsafe { + // SAFETY: We checked for null values above + CStr::from_ptr(pgnotify.value.relname) + } + .to_str() + .map_err(|e| Error::DeserializationError(e.into()))? + .to_string(); + let payload = unsafe { + // SAFETY: We checked for null values above + CStr::from_ptr(pgnotify.value.extra) + } + .to_str() + .map_err(|e| Error::DeserializationError(e.into()))? + .to_string(); + let ret = PgNotification { + process_id: pgnotify.value.be_pid, + channel, + payload, + }; + Ok(Some(ret)) + } + } } /// Represents the current in-transaction status of the connection diff --git a/diesel/src/pg/mod.rs b/diesel/src/pg/mod.rs index 0b3cf27ee8d7..8d8b27c7e86c 100644 --- a/diesel/src/pg/mod.rs +++ b/diesel/src/pg/mod.rs @@ -16,7 +16,8 @@ pub(crate) mod serialize; mod transaction; mod value; -pub use self::backend::{Pg, PgTypeMetadata}; +#[doc(inline)] +pub use self::backend::{Pg, PgNotification, PgTypeMetadata}; #[cfg(feature = "postgres")] pub use self::connection::{PgConnection, PgRowByRowLoadingMode}; #[doc(inline)]