Skip to content

Commit

Permalink
Merge pull request #4420 from thkoch2001/add_pg_notifications
Browse files Browse the repository at this point in the history
Add support for Postgres notifications
  • Loading branch information
weiznich authored Jan 11, 2025
2 parents a24aad1 + 59fa554 commit 2129cd9
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 1 deletion.
17 changes: 17 additions & 0 deletions diesel/src/pg/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,20 @@ impl sql_dialect::on_conflict_clause::PgLikeOnConflictClause for PgOnConflictCla
pub struct PgStyleArrayComparison;

impl LikeIsAllowedForType<crate::sql_types::Binary> for Pg {}

// Using the same field names as tokio-postgres
/// See Postgres documentation for SQL Commands NOTIFY and LISTEN
#[derive(Clone, Debug)]
pub struct PgNotification {
/// process ID of notifying server process
pub process_id: i32,
/// Name of the notification channel
pub channel: String,
/// optional data that was submitted with the notification,
///
/// This is set to an empty string if no data was submitted
///
/// (Postgres unfortunally does not provide a way to differentiate between
/// not set and empty here)
pub payload: String,
}
87 changes: 87 additions & 0 deletions diesel/src/pg/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::connection::instrumentation::{
use crate::connection::statement_cache::{MaybeCached, StatementCache};
use crate::connection::*;
use crate::expression::QueryMetadata;
use crate::pg::backend::PgNotification;
use crate::pg::metadata_lookup::{GetPgMetadataCache, PgMetadataCache};
use crate::pg::query_builder::copy::InternalCopyFromQuery;
use crate::pg::{Pg, TransactionBuilder};
Expand Down Expand Up @@ -547,6 +548,51 @@ impl PgConnection {
.set_notice_processor(noop_notice_processor);
Ok(())
}

/// See Postgres documentation for SQL commands [NOTIFY][] and [LISTEN][]
///
/// The returned iterator can yield items even after a None value when new notifications have been received.
/// The iterator can be polled again after a `None` value was received as new notifications might have
/// been send in the mean time.
///
/// [NOTIFY]: https://www.postgresql.org/docs/current/sql-notify.html
/// [LISTEN]: https://www.postgresql.org/docs/current/sql-listen.html
///
/// ## Example
///
/// ```
/// # include!("../../doctest_setup.rs");
/// #
/// # fn main() {
/// # run_test().unwrap();
/// # }
/// #
/// # fn run_test() -> QueryResult<()> {
/// # let connection = &mut establish_connection();
///
/// // register the notifications channel we want to receive notifications for
/// diesel::sql_query("LISTEN example_channel").execute(connection)?;
/// // send some notification
/// // this is usually done from a different connection/thread/application
/// diesel::sql_query("NOTIFY example_channel, 'additional data'").execute(connection)?;
///
/// for result in connection.notifications_iter() {
/// let notification = result.unwrap();
/// assert_eq!(notification.channel, "example_channel");
/// assert_eq!(notification.payload, "additional data");
///
/// println!(
/// "Notification received from server process with id {}.",
/// notification.process_id
/// );
/// }
/// # Ok(())
/// # }
/// ```
pub fn notifications_iter(&mut self) -> impl Iterator<Item = QueryResult<PgNotification>> + '_ {
let conn = &self.connection_and_transaction_manager.raw_connection;
std::iter::from_fn(move || conn.pq_notifies().transpose())
}
}

extern "C" fn noop_notice_processor(_: *mut libc::c_void, _message: *const libc::c_char) {}
Expand Down Expand Up @@ -626,6 +672,47 @@ mod tests {
crate::test_helpers::pg_connection_no_transaction()
}

#[diesel_test_helper::test]
fn notifications_arrive() {
use crate::sql_query;

let conn = &mut connection();
sql_query("LISTEN test_notifications")
.execute(conn)
.unwrap();
sql_query("NOTIFY test_notifications, 'first'")
.execute(conn)
.unwrap();
sql_query("NOTIFY test_notifications, 'second'")
.execute(conn)
.unwrap();

let notifications = conn
.notifications_iter()
.map(Result::unwrap)
.collect::<Vec<_>>();

assert_eq!(2, notifications.len());
assert_eq!(notifications[0].channel, "test_notifications");
assert_eq!(notifications[1].channel, "test_notifications");
assert_eq!(notifications[0].payload, "first");
assert_eq!(notifications[1].payload, "second");

let next_notification = conn.notifications_iter().next();
assert!(
next_notification.is_none(),
"Got a next notification, while not expecting one: {next_notification:?}"
);

sql_query("NOTIFY test_notifications")
.execute(conn)
.unwrap();
assert_eq!(
conn.notifications_iter().next().unwrap().unwrap().payload,
""
);
}

#[diesel_test_helper::test]
fn malformed_sql_query() {
let connection = &mut connection();
Expand Down
72 changes: 72 additions & 0 deletions diesel/src/pg/connection/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::{ptr, str};
use crate::result::*;

use super::result::PgResult;
use crate::pg::PgNotification;

#[allow(missing_debug_implementations, missing_copy_implementations)]
pub(super) struct RawConnection {
Expand Down Expand Up @@ -183,6 +184,77 @@ impl RawConnection {
))
}
}

pub(super) fn pq_notifies(&self) -> Result<Option<PgNotification>, Error> {
let conn = self.internal_connection;
let ret = unsafe { PQconsumeInput(conn.as_ptr()) };
if ret == 0 {
return Err(Error::DatabaseError(
DatabaseErrorKind::Unknown,
Box::new(self.last_error_message()),
));
}

let pgnotify = unsafe { PQnotifies(conn.as_ptr()) };
if pgnotify.is_null() {
Ok(None)
} else {
// we use a drop guard here to
// make sure that we always free
// the provided pointer, even if we
// somehow return an error below
struct Guard<'a> {
value: &'a mut pgNotify,
}

impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe {
// SAFETY: We know that this value is not null here
PQfreemem(self.value as *mut pgNotify as *mut std::ffi::c_void)
};
}
}

let pgnotify = unsafe {
// SAFETY: We checked for null values above
Guard {
value: &mut *pgnotify,
}
};
if pgnotify.value.relname.is_null() {
return Err(Error::DeserializationError(
"Received an unexpected null value for `relname` from the notification".into(),
));
}
if pgnotify.value.extra.is_null() {
return Err(Error::DeserializationError(
"Received an unexpected null value for `extra` from the notification".into(),
));
}

let channel = unsafe {
// SAFETY: We checked for null values above
CStr::from_ptr(pgnotify.value.relname)
}
.to_str()
.map_err(|e| Error::DeserializationError(e.into()))?
.to_string();
let payload = unsafe {
// SAFETY: We checked for null values above
CStr::from_ptr(pgnotify.value.extra)
}
.to_str()
.map_err(|e| Error::DeserializationError(e.into()))?
.to_string();
let ret = PgNotification {
process_id: pgnotify.value.be_pid,
channel,
payload,
};
Ok(Some(ret))
}
}
}

/// Represents the current in-transaction status of the connection
Expand Down
3 changes: 2 additions & 1 deletion diesel/src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ pub(crate) mod serialize;
mod transaction;
mod value;

pub use self::backend::{Pg, PgTypeMetadata};
#[doc(inline)]
pub use self::backend::{Pg, PgNotification, PgTypeMetadata};
#[cfg(feature = "postgres")]
pub use self::connection::{PgConnection, PgRowByRowLoadingMode};
#[doc(inline)]
Expand Down

0 comments on commit 2129cd9

Please sign in to comment.