Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepared statements issue #223

Merged
merged 6 commits into from
Sep 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions notification-server/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 0.3.2
- Initiate prepared statements from within each function call
-
## 0.3.1
- No longer use aspn specific information

## 0.3.0
- Updated the Concordium Rust SDK to support the changes introduced in protocol 7.

Expand Down
2 changes: 1 addition & 1 deletion notification-server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion notification-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = ["Concordium AG [email protected]"]
edition = "2021"
name = "notification-server"
version = "0.3.1"
version = "0.3.2"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
6 changes: 5 additions & 1 deletion notification-server/scripts/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ FROM ${base_image}
WORKDIR /usr/app

RUN apt-get update && \
apt-get install -y gnupg wget lsb-release && \
sh -c 'echo "deb http://apt.postgresql.org/pub/repos/apt $(lsb_release -cs)-pgdg main" > /etc/apt/sources.list.d/postgres.list' && \
wget --quiet -O - https://www.postgresql.org/media/keys/ACCC4CF8.asc | apt-key add - && \
apt-get update && \
apt-get -y install \
postgresql-client \
postgresql-client-14 \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*

Expand Down
1 change: 0 additions & 1 deletion notification-server/src/bin/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,6 @@ async fn process_device_subscription(
let decoded_accounts = decoded_accounts?;
state
.db_connection
.prepared
.upsert_subscription(
decoded_accounts,
subscription.preferences,
Expand Down
3 changes: 0 additions & 3 deletions notification-server/src/bin/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ async fn process_block(
);
let operation = || async {
match database_connection
.prepared
.get_devices_from_account(result.address())
.await
{
Expand Down Expand Up @@ -228,7 +227,6 @@ async fn process_block(
}
let operation = || async {
database_connection
.prepared
.insert_block(&block_hash, &finalized_block.height)
.await
.map_err(|err| match err {
Expand Down Expand Up @@ -405,7 +403,6 @@ async fn main() -> anyhow::Result<()> {
let database_connection = DatabaseConnection::create(args.db_connection).await?;
let mut concordium_client = Client::new(endpoint).await?;
let mut height = if let Some(height) = database_connection
.prepared
.get_processed_block_height()
.await
.context("Failed to get processed block height")?
Expand Down
188 changes: 62 additions & 126 deletions notification-server/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::models::device::{Device, Preference};
use anyhow::Context;
use concordium_rust_sdk::{
base::hashes::BlockHash, common::types::AccountAddress, types::AbsoluteBlockHeight,
};
use deadpool_postgres::{Manager, ManagerConfig, Pool, PoolError, RecyclingMethod};
use deadpool_postgres::{GenericClient, Manager, ManagerConfig, Pool, PoolError, RecyclingMethod};
use lazy_static::lazy_static;
use log::error;
use std::{
Expand All @@ -27,63 +26,31 @@ pub enum Error {
}

#[derive(Clone, Debug)]
pub struct PreparedStatements {
get_devices_from_account: tokio_postgres::Statement,
upsert_device: tokio_postgres::Statement,
get_latest_block_height: tokio_postgres::Statement,
insert_block: tokio_postgres::Statement,
pool: Pool,
}
pub struct DatabaseConnection(Pool);

impl PreparedStatements {
async fn new(pool: Pool) -> anyhow::Result<Self> {
let mut client = pool.get().await.context("Failed to get client")?;
let transaction = client
.transaction()
.await
.context("Failed to start a transaction")?;
let get_devices_from_account = transaction
.prepare(
"SELECT device_id, preferences FROM account_device_mapping WHERE address = $1 \
LIMIT 1000",
)
.await
.context("Failed to create account device mapping")?;
let upsert_device = transaction
.prepare(
"INSERT INTO account_device_mapping (address, device_id, preferences) VALUES ($1, \
$2, $3) ON CONFLICT (address, device_id) DO UPDATE SET preferences = \
EXCLUDED.preferences;",
)
.await
.context("Failed to create account device mapping")?;
let get_latest_block_height = transaction
.prepare(
impl DatabaseConnection {
pub async fn create(config: tokio_postgres::config::Config) -> anyhow::Result<Self> {
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
let mgr = Manager::from_config(config, NoTls, mgr_config);
let pool = Pool::builder(mgr)
.max_size(16)
.build()
.expect("Failed to create pool");
Ok(DatabaseConnection(pool))
}

pub async fn get_processed_block_height(&self) -> Result<Option<AbsoluteBlockHeight>, Error> {
let client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached(
"SELECT blocks.height FROM blocks WHERE blocks.id = (SELECT MAX(blocks.id) FROM \
blocks);",
)
.await
.context("Failed to create get latest block height")?;
let insert_block = transaction
.prepare("INSERT INTO blocks (hash, height) VALUES ($1, $2);")
.await
.context("Failed to create insert block")?;
transaction
.commit()
.await
.context("Failed to commit transaction")?;
Ok(PreparedStatements {
get_devices_from_account,
upsert_device,
get_latest_block_height,
insert_block,
pool,
})
}

pub async fn get_processed_block_height(&self) -> Result<Option<AbsoluteBlockHeight>, Error> {
let client = self.pool.get().await.map_err(Into::<Error>::into)?;
let row = client.query_opt(&self.get_latest_block_height, &[]).await?;
.map_err(Into::<Error>::into)?;
let row = client.query_opt(&stmt, &[]).await?;
row.map(|row| row.try_get::<_, i64>(0).map(|raw| (raw as u64).into()))
.transpose()
.map_err(Into::into)
Expand All @@ -93,10 +60,17 @@ impl PreparedStatements {
&self,
account_address: &AccountAddress,
) -> Result<Vec<Device>, Error> {
let client = self.pool.get().await.map_err(Into::<Error>::into)?;
let params: &[&(dyn tokio_postgres::types::ToSql + Sync)] = &[&account_address.0.as_ref()];
let client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached(
"SELECT device_id, preferences FROM account_device_mapping WHERE address = $1 \
LIMIT 1000",
)
.await
.map_err(Into::<Error>::into)?;
let params: &[&(dyn ToSql + Sync)] = &[&account_address.0.as_ref()];
let rows = client
.query(&self.get_devices_from_account, params)
.query(&stmt, params)
.await
.map_err(Into::<Error>::into)?;
rows.iter()
Expand All @@ -114,13 +88,21 @@ impl PreparedStatements {
preferences: Vec<Preference>,
device_token: &str,
) -> Result<(), Error> {
let mut client = self.pool.get().await.map_err(Into::<Error>::into)?;
let mut client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached(
"INSERT INTO account_device_mapping (address, device_id, preferences) VALUES ($1, \
$2, $3) ON CONFLICT (address, device_id) DO UPDATE SET preferences = \
EXCLUDED.preferences;",
)
.await
.map_err(Into::<Error>::into)?;
let preferences_mask = preferences_to_bitmask(preferences.into_iter());
let transaction = client.transaction().await?;
for account in account_address {
let params: &[&(dyn ToSql + Sync)] =
&[&account.0.as_ref(), &device_token, &preferences_mask];
if let Err(e) = transaction.execute(&self.upsert_device, params).await {
if let Err(e) = transaction.execute(&stmt, params).await {
let _ = transaction.rollback().await;
return Err(e.into());
}
Expand All @@ -133,42 +115,23 @@ impl PreparedStatements {
hash: &BlockHash,
height: &AbsoluteBlockHeight,
) -> Result<(), Error> {
let client = self.pool.get().await.map_err(Into::<Error>::into)?;
let params: &[&(dyn ToSql + Sync); 2] = &[&hash.as_ref(), &(height.height as i64)];
client
.execute(&self.insert_block, params)
let client = self.0.get().await.map_err(Into::<Error>::into)?;
let stmt = client
.prepare_cached("INSERT INTO blocks (hash, height) VALUES ($1, $2);")
.await
.map_or_else(
|err| {
if let Some(db_err) = err.as_db_error() {
if db_err.code() == &SqlState::UNIQUE_VIOLATION {
return Err(Error::ConstraintViolation(*hash, *height));
}
};
Err(Error::DatabaseConnection(err))
},
|_| Ok(()),
)
}
}

#[derive(Clone, Debug)]
pub struct DatabaseConnection {
pub prepared: PreparedStatements,
}

impl DatabaseConnection {
pub async fn create(config: tokio_postgres::config::Config) -> anyhow::Result<Self> {
let mgr_config = ManagerConfig {
recycling_method: RecyclingMethod::Fast,
};
let mgr = Manager::from_config(config, NoTls, mgr_config);
let pool = Pool::builder(mgr)
.max_size(16)
.build()
.expect("Failed to create pool");
let prepared = PreparedStatements::new(pool).await?;
Ok(DatabaseConnection { prepared })
.map_err(Into::<Error>::into)?;
let params: &[&(dyn ToSql + Sync); 2] = &[&hash.as_ref(), &(height.height as i64)];
client.execute(&stmt, params).await.map_or_else(
|err| {
if let Some(db_err) = err.as_db_error() {
if db_err.code() == &SqlState::UNIQUE_VIOLATION {
return Err(Error::ConstraintViolation(*hash, *height));
}
};
Err(Error::DatabaseConnection(err))
},
|_| Ok(()),
)
}
}

Expand Down Expand Up @@ -322,7 +285,7 @@ mod tests {
.unwrap();
let db_connection = DatabaseConnection::create(config).await?;

let client = db_connection.prepared.pool.get().await?;
let client = db_connection.0.get().await?;
drop_all_tables(&client).await?;
create_sql(&client).await?;

Expand All @@ -337,7 +300,6 @@ mod tests {
AccountAddress::from_str("4FmiTW2L2AccyR9VjzsnpWFSAcohXWf7Vf797i36y526mqiEcp").unwrap();
let device = "device-1";
db_connection
.prepared
.upsert_subscription(
vec![account_address],
vec![Preference::CIS2Transaction],
Expand All @@ -346,7 +308,6 @@ mod tests {
.await
.unwrap();
let devices = db_connection
.prepared
.get_devices_from_account(&account_address)
.await
.unwrap();
Expand All @@ -366,12 +327,10 @@ mod tests {
AccountAddress::from_str("4FmiTW2L2AccyR9VjzsnpWFSAcohXWf7Vf797i36y526mqiEcp").unwrap();
let device = "device-1";
db_connection
.prepared
.upsert_subscription(vec![account_address], vec![CIS2Transaction], device)
.await
.unwrap();
db_connection
.prepared
.upsert_subscription(
vec![account_address],
vec![CIS2Transaction, CCDTransaction],
Expand All @@ -380,7 +339,6 @@ mod tests {
.await
.unwrap();
let devices = db_connection
.prepared
.get_devices_from_account(&account_address)
.await
.unwrap();
Expand All @@ -391,12 +349,10 @@ mod tests {
)]);

db_connection
.prepared
.upsert_subscription(vec![account_address], vec![], device)
.await
.unwrap();
let devices = db_connection
.prepared
.get_devices_from_account(&account_address)
.await
.unwrap();
Expand All @@ -414,32 +370,16 @@ mod tests {
let hash = BlockHash::new([0; 32]); // Example block hash
let height = AbsoluteBlockHeight::from(1);

db_connection
.prepared
.insert_block(&hash, &height)
.await
.unwrap();
db_connection.insert_block(&hash, &height).await.unwrap();

let latest_height = db_connection
.prepared
.get_processed_block_height()
.await
.unwrap();
let latest_height = db_connection.get_processed_block_height().await.unwrap();
assert_eq!(latest_height, Some(height));

let hash = BlockHash::new([1; 32]); // Example block hash
let height = AbsoluteBlockHeight::from(2);

db_connection
.prepared
.insert_block(&hash, &height)
.await
.unwrap();
let latest_height = db_connection
.prepared
.get_processed_block_height()
.await
.unwrap();
db_connection.insert_block(&hash, &height).await.unwrap();
let latest_height = db_connection.get_processed_block_height().await.unwrap();
assert_eq!(latest_height.unwrap().height, 2);
}

Expand All @@ -452,7 +392,6 @@ mod tests {
let expected_height = AbsoluteBlockHeight::from(1);

db_connection
.prepared
.insert_block(
&BlockHash::new(expected_hash),
&AbsoluteBlockHeight::from(2),
Expand All @@ -461,7 +400,6 @@ mod tests {
.unwrap();

if db_connection
.prepared
.insert_block(&BlockHash::new(expected_hash), &expected_height)
.await
.is_err()
Expand All @@ -479,13 +417,11 @@ mod tests {
let expected_height = AbsoluteBlockHeight::from(1);

db_connection
.prepared
.insert_block(&BlockHash::new([0; 32]), &expected_height)
.await
.unwrap();

match db_connection
.prepared
.insert_block(&BlockHash::new(expected_hash), &expected_height)
.await
{
Expand Down