diff --git a/src/routes/subscriptions.rs b/src/routes/subscriptions.rs index 9da8b43..3f02656 100644 --- a/src/routes/subscriptions.rs +++ b/src/routes/subscriptions.rs @@ -5,7 +5,7 @@ use actix_web::{web, HttpResponse}; use chrono::Utc; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -use sqlx::PgPool; +use sqlx::{Executor, PgPool, Postgres, Transaction}; use uuid::Uuid; #[derive(serde::Deserialize)] @@ -52,17 +52,24 @@ pub async fn subscribe( Ok(subscriber) => subscriber, Err(_) => return HttpResponse::BadRequest().finish(), }; - let subscriber_id = match insert_subscriber(&pool, &new_subscriber).await { + let mut transaction = match pool.begin().await { + Ok(transaction) => transaction, + Err(_) => return HttpResponse::InternalServerError().finish(), + }; + let subscriber_id = match insert_subscriber(&mut transaction, &new_subscriber).await { Ok(subscriber_id) => subscriber_id, Err(_) => return HttpResponse::InternalServerError().finish(), }; let subscription_token = generate_subscription_token(); - if store_token(&pool, subscriber_id, &subscription_token) + if store_token(&mut transaction, subscriber_id, &subscription_token) .await .is_err() { return HttpResponse::InternalServerError().finish(); } + if transaction.commit().await.is_err() { + return HttpResponse::InternalServerError().finish(); + } if send_confirmation_email( &email_client, new_subscriber, @@ -106,22 +113,20 @@ pub async fn send_confirmation_email( #[tracing::instrument( name = "Store subscription token in the database", - skip(subscription_token, pool) + skip(subscription_token, transaction) )] pub async fn store_token( - pool: &PgPool, + transaction: &mut Transaction<'_, Postgres>, subscriber_id: Uuid, subscription_token: &str, ) -> Result<(), sqlx::Error> { - sqlx::query!( + let query = sqlx::query!( r#"INSERT INTO subscription_tokens (subscription_token, subscriber_id) VALUES ($1, $2)"#, subscription_token, subscriber_id - ) - .execute(pool) - .await - .map_err(|e| { + ); + transaction.execute(query).await.map_err(|e| { tracing::error!("Failed to execute query: {:?}", e); e })?; @@ -130,14 +135,14 @@ pub async fn store_token( #[tracing::instrument( name = "Saving new subscriber details in the database", - skip(new_subscriber, pool) + skip(new_subscriber, transaction) )] pub async fn insert_subscriber( - pool: &PgPool, + transaction: &mut Transaction<'_, Postgres>, new_subscriber: &NewSubscriber, ) -> Result { let subscriber_id = Uuid::new_v4(); - sqlx::query!( + let query = sqlx::query!( r#" INSERT INTO subscriptions (id, email, name, subscribed_at, status) VALUES ($1, $2, $3, $4, 'pending_confirmation')"#, @@ -145,10 +150,8 @@ pub async fn insert_subscriber( new_subscriber.email.as_ref(), new_subscriber.name.as_ref(), Utc::now() - ) - .execute(pool) - .await - .map_err(|e| { + ); + transaction.execute(query).await.map_err(|e| { tracing::error!("Failed to execute query: {:?}", e); e })?;