From 9f443d9111763da186c8a6ded46ac2300eca454e Mon Sep 17 00:00:00 2001 From: Lasse Alm Date: Tue, 17 Sep 2024 09:00:31 +0200 Subject: [PATCH] Prepared statements issue (#223) * fix prepared statements issue * bumped notification server * remove prepared statement * add changelog * updated changelog * use self.0 --- notification-server/CHANGELOG.md | 6 + notification-server/Cargo.lock | 2 +- notification-server/Cargo.toml | 2 +- notification-server/scripts/Dockerfile | 6 +- notification-server/src/bin/api.rs | 1 - notification-server/src/bin/service.rs | 3 - notification-server/src/database.rs | 188 ++++++++----------------- 7 files changed, 75 insertions(+), 133 deletions(-) diff --git a/notification-server/CHANGELOG.md b/notification-server/CHANGELOG.md index 88a189e9..6d969314 100644 --- a/notification-server/CHANGELOG.md +++ b/notification-server/CHANGELOG.md @@ -1,3 +1,9 @@ +## 0.3.2 +- Initiate prepared statements from within each function call +- +## 0.3.1 +- No longer use aspn specific information + ## 0.3.0 - Updated the Concordium Rust SDK to support the changes introduced in protocol 7. diff --git a/notification-server/Cargo.lock b/notification-server/Cargo.lock index a27393dc..b7a30f1f 100644 --- a/notification-server/Cargo.lock +++ b/notification-server/Cargo.lock @@ -2098,7 +2098,7 @@ dependencies = [ [[package]] name = "notification-server" -version = "0.3.1" +version = "0.3.2" dependencies = [ "anyhow", "async-trait", diff --git a/notification-server/Cargo.toml b/notification-server/Cargo.toml index ab413a0d..e330c415 100644 --- a/notification-server/Cargo.toml +++ b/notification-server/Cargo.toml @@ -2,7 +2,7 @@ authors = ["Concordium AG developers@concordium.com"] edition = "2021" name = "notification-server" -version = "0.3.1" +version = "0.3.2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/notification-server/scripts/Dockerfile b/notification-server/scripts/Dockerfile index 2421101d..62bba270 100644 --- a/notification-server/scripts/Dockerfile +++ b/notification-server/scripts/Dockerfile @@ -14,8 +14,12 @@ FROM ${base_image} WORKDIR /usr/app RUN apt-get update && \ + apt-get install -y gnupg wget lsb-release && \ + sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/postgres.list' && \ + wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \ + apt-get update && \ apt-get -y install \ - postgresql-client \ + postgresql-client-14 \ ca-certificates \ && rm -rf /var/lib/apt/lists/* diff --git a/notification-server/src/bin/api.rs b/notification-server/src/bin/api.rs index 1c38dda0..40b890cc 100644 --- a/notification-server/src/bin/api.rs +++ b/notification-server/src/bin/api.rs @@ -219,7 +219,6 @@ async fn process_device_subscription( let decoded_accounts = decoded_accounts?; state .db_connection - .prepared .upsert_subscription( decoded_accounts, subscription.preferences, diff --git a/notification-server/src/bin/service.rs b/notification-server/src/bin/service.rs index 399ca20c..6619e38e 100644 --- a/notification-server/src/bin/service.rs +++ b/notification-server/src/bin/service.rs @@ -153,7 +153,6 @@ async fn process_block( ); let operation = || async { match database_connection - .prepared .get_devices_from_account(result.address()) .await { @@ -228,7 +227,6 @@ async fn process_block( } let operation = || async { database_connection - .prepared .insert_block(&block_hash, &finalized_block.height) .await .map_err(|err| match err { @@ -405,7 +403,6 @@ async fn main() -> anyhow::Result<()> { let database_connection = DatabaseConnection::create(args.db_connection).await?; let mut concordium_client = Client::new(endpoint).await?; let mut height = if let Some(height) = database_connection - .prepared .get_processed_block_height() .await .context("Failed to get processed block height")? diff --git a/notification-server/src/database.rs b/notification-server/src/database.rs index 50aae53d..cf6416eb 100644 --- a/notification-server/src/database.rs +++ b/notification-server/src/database.rs @@ -1,9 +1,8 @@ use crate::models::device::{Device, Preference}; -use anyhow::Context; use concordium_rust_sdk::{ base::hashes::BlockHash, common::types::AccountAddress, types::AbsoluteBlockHeight, }; -use deadpool_postgres::{Manager, ManagerConfig, Pool, PoolError, RecyclingMethod}; +use deadpool_postgres::{GenericClient, Manager, ManagerConfig, Pool, PoolError, RecyclingMethod}; use lazy_static::lazy_static; use log::error; use std::{ @@ -27,63 +26,31 @@ pub enum Error { } #[derive(Clone, Debug)] -pub struct PreparedStatements { - get_devices_from_account: tokio_postgres::Statement, - upsert_device: tokio_postgres::Statement, - get_latest_block_height: tokio_postgres::Statement, - insert_block: tokio_postgres::Statement, - pool: Pool, -} +pub struct DatabaseConnection(Pool); -impl PreparedStatements { - async fn new(pool: Pool) -> anyhow::Result { - let mut client = pool.get().await.context("Failed to get client")?; - let transaction = client - .transaction() - .await - .context("Failed to start a transaction")?; - let get_devices_from_account = transaction - .prepare( - "SELECT device_id, preferences FROM account_device_mapping WHERE address = $1 \ - LIMIT 1000", - ) - .await - .context("Failed to create account device mapping")?; - let upsert_device = transaction - .prepare( - "INSERT INTO account_device_mapping (address, device_id, preferences) VALUES ($1, \ - $2, $3) ON CONFLICT (address, device_id) DO UPDATE SET preferences = \ - EXCLUDED.preferences;", - ) - .await - .context("Failed to create account device mapping")?; - let get_latest_block_height = transaction - .prepare( +impl DatabaseConnection { + pub async fn create(config: tokio_postgres::config::Config) -> anyhow::Result { + let mgr_config = ManagerConfig { + recycling_method: RecyclingMethod::Fast, + }; + let mgr = Manager::from_config(config, NoTls, mgr_config); + let pool = Pool::builder(mgr) + .max_size(16) + .build() + .expect("Failed to create pool"); + Ok(DatabaseConnection(pool)) + } + + pub async fn get_processed_block_height(&self) -> Result, Error> { + let client = self.0.get().await.map_err(Into::::into)?; + let stmt = client + .prepare_cached( "SELECT blocks.height FROM blocks WHERE blocks.id = (SELECT MAX(blocks.id) FROM \ blocks);", ) .await - .context("Failed to create get latest block height")?; - let insert_block = transaction - .prepare("INSERT INTO blocks (hash, height) VALUES ($1, $2);") - .await - .context("Failed to create insert block")?; - transaction - .commit() - .await - .context("Failed to commit transaction")?; - Ok(PreparedStatements { - get_devices_from_account, - upsert_device, - get_latest_block_height, - insert_block, - pool, - }) - } - - pub async fn get_processed_block_height(&self) -> Result, Error> { - let client = self.pool.get().await.map_err(Into::::into)?; - let row = client.query_opt(&self.get_latest_block_height, &[]).await?; + .map_err(Into::::into)?; + let row = client.query_opt(&stmt, &[]).await?; row.map(|row| row.try_get::<_, i64>(0).map(|raw| (raw as u64).into())) .transpose() .map_err(Into::into) @@ -93,10 +60,17 @@ impl PreparedStatements { &self, account_address: &AccountAddress, ) -> Result, Error> { - let client = self.pool.get().await.map_err(Into::::into)?; - let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&account_address.0.as_ref()]; + let client = self.0.get().await.map_err(Into::::into)?; + let stmt = client + .prepare_cached( + "SELECT device_id, preferences FROM account_device_mapping WHERE address = $1 \ + LIMIT 1000", + ) + .await + .map_err(Into::::into)?; + let params: &[&(dyn ToSql + Sync)] = &[&account_address.0.as_ref()]; let rows = client - .query(&self.get_devices_from_account, params) + .query(&stmt, params) .await .map_err(Into::::into)?; rows.iter() @@ -114,13 +88,21 @@ impl PreparedStatements { preferences: Vec, device_token: &str, ) -> Result<(), Error> { - let mut client = self.pool.get().await.map_err(Into::::into)?; + let mut client = self.0.get().await.map_err(Into::::into)?; + let stmt = client + .prepare_cached( + "INSERT INTO account_device_mapping (address, device_id, preferences) VALUES ($1, \ + $2, $3) ON CONFLICT (address, device_id) DO UPDATE SET preferences = \ + EXCLUDED.preferences;", + ) + .await + .map_err(Into::::into)?; let preferences_mask = preferences_to_bitmask(preferences.into_iter()); let transaction = client.transaction().await?; for account in account_address { let params: &[&(dyn ToSql + Sync)] = &[&account.0.as_ref(), &device_token, &preferences_mask]; - if let Err(e) = transaction.execute(&self.upsert_device, params).await { + if let Err(e) = transaction.execute(&stmt, params).await { let _ = transaction.rollback().await; return Err(e.into()); } @@ -133,42 +115,23 @@ impl PreparedStatements { hash: &BlockHash, height: &AbsoluteBlockHeight, ) -> Result<(), Error> { - let client = self.pool.get().await.map_err(Into::::into)?; - let params: &[&(dyn ToSql + Sync); 2] = &[&hash.as_ref(), &(height.height as i64)]; - client - .execute(&self.insert_block, params) + let client = self.0.get().await.map_err(Into::::into)?; + let stmt = client + .prepare_cached("INSERT INTO blocks (hash, height) VALUES ($1, $2);") .await - .map_or_else( - |err| { - if let Some(db_err) = err.as_db_error() { - if db_err.code() == &SqlState::UNIQUE_VIOLATION { - return Err(Error::ConstraintViolation(*hash, *height)); - } - }; - Err(Error::DatabaseConnection(err)) - }, - |_| Ok(()), - ) - } -} - -#[derive(Clone, Debug)] -pub struct DatabaseConnection { - pub prepared: PreparedStatements, -} - -impl DatabaseConnection { - pub async fn create(config: tokio_postgres::config::Config) -> anyhow::Result { - let mgr_config = ManagerConfig { - recycling_method: RecyclingMethod::Fast, - }; - let mgr = Manager::from_config(config, NoTls, mgr_config); - let pool = Pool::builder(mgr) - .max_size(16) - .build() - .expect("Failed to create pool"); - let prepared = PreparedStatements::new(pool).await?; - Ok(DatabaseConnection { prepared }) + .map_err(Into::::into)?; + let params: &[&(dyn ToSql + Sync); 2] = &[&hash.as_ref(), &(height.height as i64)]; + client.execute(&stmt, params).await.map_or_else( + |err| { + if let Some(db_err) = err.as_db_error() { + if db_err.code() == &SqlState::UNIQUE_VIOLATION { + return Err(Error::ConstraintViolation(*hash, *height)); + } + }; + Err(Error::DatabaseConnection(err)) + }, + |_| Ok(()), + ) } } @@ -322,7 +285,7 @@ mod tests { .unwrap(); let db_connection = DatabaseConnection::create(config).await?; - let client = db_connection.prepared.pool.get().await?; + let client = db_connection.0.get().await?; drop_all_tables(&client).await?; create_sql(&client).await?; @@ -337,7 +300,6 @@ mod tests { AccountAddress::from_str("4FmiTW2L2AccyR9VjzsnpWFSAcohXWf7Vf797i36y526mqiEcp").unwrap(); let device = "device-1"; db_connection - .prepared .upsert_subscription( vec![account_address], vec![Preference::CIS2Transaction], @@ -346,7 +308,6 @@ mod tests { .await .unwrap(); let devices = db_connection - .prepared .get_devices_from_account(&account_address) .await .unwrap(); @@ -366,12 +327,10 @@ mod tests { AccountAddress::from_str("4FmiTW2L2AccyR9VjzsnpWFSAcohXWf7Vf797i36y526mqiEcp").unwrap(); let device = "device-1"; db_connection - .prepared .upsert_subscription(vec![account_address], vec![CIS2Transaction], device) .await .unwrap(); db_connection - .prepared .upsert_subscription( vec![account_address], vec![CIS2Transaction, CCDTransaction], @@ -380,7 +339,6 @@ mod tests { .await .unwrap(); let devices = db_connection - .prepared .get_devices_from_account(&account_address) .await .unwrap(); @@ -391,12 +349,10 @@ mod tests { )]); db_connection - .prepared .upsert_subscription(vec![account_address], vec![], device) .await .unwrap(); let devices = db_connection - .prepared .get_devices_from_account(&account_address) .await .unwrap(); @@ -414,32 +370,16 @@ mod tests { let hash = BlockHash::new([0; 32]); // Example block hash let height = AbsoluteBlockHeight::from(1); - db_connection - .prepared - .insert_block(&hash, &height) - .await - .unwrap(); + db_connection.insert_block(&hash, &height).await.unwrap(); - let latest_height = db_connection - .prepared - .get_processed_block_height() - .await - .unwrap(); + let latest_height = db_connection.get_processed_block_height().await.unwrap(); assert_eq!(latest_height, Some(height)); let hash = BlockHash::new([1; 32]); // Example block hash let height = AbsoluteBlockHeight::from(2); - db_connection - .prepared - .insert_block(&hash, &height) - .await - .unwrap(); - let latest_height = db_connection - .prepared - .get_processed_block_height() - .await - .unwrap(); + db_connection.insert_block(&hash, &height).await.unwrap(); + let latest_height = db_connection.get_processed_block_height().await.unwrap(); assert_eq!(latest_height.unwrap().height, 2); } @@ -452,7 +392,6 @@ mod tests { let expected_height = AbsoluteBlockHeight::from(1); db_connection - .prepared .insert_block( &BlockHash::new(expected_hash), &AbsoluteBlockHeight::from(2), @@ -461,7 +400,6 @@ mod tests { .unwrap(); if db_connection - .prepared .insert_block(&BlockHash::new(expected_hash), &expected_height) .await .is_err() @@ -479,13 +417,11 @@ mod tests { let expected_height = AbsoluteBlockHeight::from(1); db_connection - .prepared .insert_block(&BlockHash::new([0; 32]), &expected_height) .await .unwrap(); match db_connection - .prepared .insert_block(&BlockHash::new(expected_hash), &expected_height) .await {