From a40cbd923122bdb8a1c1680da84b2c0b1dd55b07 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Tue, 30 Jul 2024 18:47:13 +0530 Subject: [PATCH] remove queue project --- Cargo.toml | 2 +- queue/Cargo.toml | 20 ----------- queue/examples/consumer.rs | 42 ----------------------- queue/examples/producer.rs | 37 --------------------- queue/src/lib.rs | 68 -------------------------------------- 5 files changed, 1 insertion(+), 168 deletions(-) delete mode 100644 queue/Cargo.toml delete mode 100644 queue/examples/consumer.rs delete mode 100644 queue/examples/producer.rs delete mode 100644 queue/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 7d78bb7..f0e6864 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ resolver = "2" -members = ["api", "pg_replicate", "queue", "replicator"] +members = ["api", "pg_replicate", "replicator"] [workspace.dependencies] actix-web = { version = "4", default-features = false } diff --git a/queue/Cargo.toml b/queue/Cargo.toml deleted file mode 100644 index c5b966c..0000000 --- a/queue/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "queue" -version = "0.1.0" -edition = "2021" - -[dependencies] -bytes = { workspace = true } -chrono = { workspace = true, features = ["serde"] } -serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true, features = ["std"] } -tokio-postgres = { workspace = true, features = [ - "runtime", - "with-chrono-0_4", - "with-serde_json-1", -] } - -[dev-dependencies] -tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } -tracing = { workspace = true, default-features = true } -tracing-subscriber = { workspace = true, default-features = true } diff --git a/queue/examples/consumer.rs b/queue/examples/consumer.rs deleted file mode 100644 index d0d4c2d..0000000 --- a/queue/examples/consumer.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::error::Error; - -use queue::{delete_task, dequeue}; -use tokio_postgres::NoTls; -use tracing::{error, info}; - -#[tokio::main] -async fn main() -> Result<(), Box> { - if let Err(e) = main_impl().await { - error!("{e}"); - } - - Ok(()) -} - -async fn main_impl() -> Result<(), Box> { - tracing_subscriber::fmt::init(); - - let (mut client, connection) = tokio_postgres::connect( - "host=localhost port=5431 dbname=replicator user=raminder.singh", - NoTls, - ) - .await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - - let (txn, task) = dequeue(&mut client).await?; - - info!("task dequeued: {task:#?}"); - - if let Some(task) = task { - delete_task(txn, task.id).await?; - } else { - txn.commit().await?; - } - - Ok(()) -} diff --git a/queue/examples/producer.rs b/queue/examples/producer.rs deleted file mode 100644 index ce62ea0..0000000 --- a/queue/examples/producer.rs +++ /dev/null @@ -1,37 +0,0 @@ -use std::error::Error; - -use queue::enqueue; -use serde_json::json; -use tokio_postgres::NoTls; -use tracing::{error, info}; - -#[tokio::main] -async fn main() -> Result<(), Box> { - if let Err(e) = main_impl().await { - error!("{e}"); - } - - Ok(()) -} - -async fn main_impl() -> Result<(), Box> { - tracing_subscriber::fmt::init(); - - let (client, connection) = tokio_postgres::connect( - "host=localhost port=5431 dbname=replicator user=raminder.singh", - NoTls, - ) - .await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - - let id = enqueue(&client, "test task", json!({"key": "value"})).await?; - - info!("task with id {id} enqueued in pending state"); - - Ok(()) -} diff --git a/queue/src/lib.rs b/queue/src/lib.rs deleted file mode 100644 index f7a8d9d..0000000 --- a/queue/src/lib.rs +++ /dev/null @@ -1,68 +0,0 @@ -use core::str; - -use tokio_postgres::{Client, Transaction}; - -pub async fn enqueue( - client: &Client, - task_name: &str, - task_data: serde_json::Value, -) -> Result { - let row = client - .query_one( - r#" - insert into queue.task_queue (name, data) - values($1, $2) returning id"#, - &[&task_name, &task_data], - ) - .await?; - Ok(row.get(0)) -} - -#[derive(Debug)] -pub struct Task { - pub id: i64, - pub name: String, - pub data: serde_json::Value, -} - -pub async fn dequeue( - client: &mut Client, -) -> Result<(Transaction, Option), tokio_postgres::Error> { - let txn = client.transaction().await?; - - let row = txn - .query_opt( - r#" - select id, name, data - from queue.task_queue - order by id - limit 1 - for update - skip locked"#, - &[], - ) - .await?; - - let task = row.map(|row| Task { - id: row.get(0), - name: row.get(1), - data: row.get(2), - }); - - Ok((txn, task)) -} - -pub async fn delete_task(txn: Transaction<'_>, id: i64) -> Result<(), tokio_postgres::Error> { - txn.execute( - r#" - delete from queue.task_queue - where id = $1 - "#, - &[&id], - ) - .await?; - - txn.commit().await?; - - Ok(()) -}