From a245f5575b99eb56a7b83f3612e482876e09e6a1 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Tue, 16 Jul 2024 17:48:32 +0200 Subject: [PATCH 01/36] DB except serialization --- Cargo.toml | 3 +- README.md | 5 ++- build.rs | 25 +++++++++++ resources/sql/SELECT_TABLES | 1 + src/config.rs | 10 +++++ src/db.rs | 86 +++++++++++++++++++++++++++++++++++++ src/errors.rs | 8 ++++ src/main.rs | 56 ++++++++++++++++++++++-- 8 files changed, 188 insertions(+), 6 deletions(-) create mode 100644 resources/sql/SELECT_TABLES create mode 100644 src/db.rs diff --git a/Cargo.toml b/Cargo.toml index 8d0e486..85c1200 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,7 @@ license = "Apache-2.0" [dependencies] base64 = "0.22.1" http = "0.2" -reqwest = { version = "0.11", default_features = false, features = ["json", "default-tls"] } +reqwest = { version = "0.11", default-features = false, features = ["json", "default-tls"] } serde = { version = "1.0.152", features = ["serde_derive"] } serde_json = "1.0" thiserror = "1.0.38" @@ -21,6 +21,7 @@ laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" } uuid = "1.8.0" rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } +sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres", "macros"] } # Logging tracing = { version = "0.1.37", default_features = false } diff --git a/README.md b/README.md index 2de2579..fc21c93 100644 --- a/README.md +++ b/README.md @@ -49,9 +49,10 @@ EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the str ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10 PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid;dktk_supervisors;exporter;ehds2" QUERIES_TO_CACHE = "queries_to_cache.conf" # The path to a file containing base64 encoded queries whose results are to be cached. If not set, no results are cached -PROVIDER = "name" #OMOP provider name -PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded OMOP provider icon +PROVIDER = "name" #EUCAIM provider name +PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded EUCAIM provider icon AUTH_HEADER = "ApiKey XXXX" #Authorization header +DB_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Database connection string ``` Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`. diff --git a/build.rs b/build.rs index 76667b0..b3a11b1 100644 --- a/build.rs +++ b/build.rs @@ -41,6 +41,30 @@ fn build_cqlmap() { ).unwrap(); } +fn build_sqlmap() { + let path = Path::new(&env::var("OUT_DIR").unwrap()).join("sql_replace_map.rs"); + let mut file = BufWriter::new(File::create(path).unwrap()); + + write!(&mut file, r#" + static SQL_REPLACE_MAP: once_cell::sync::Lazy> = once_cell::sync::Lazy::new(|| {{ + let mut map = HashMap::new(); + "#).unwrap(); + + for sqlfile in std::fs::read_dir(Path::new("resources/sql")).unwrap() { + let sqlfile = sqlfile.unwrap(); + let sqlfilename = sqlfile.file_name().to_str().unwrap().to_owned(); + let sqlcontent = std::fs::read_to_string(sqlfile.path()).unwrap(); + write!(&mut file, r####" + map.insert(r###"{sqlfilename}"###, r###"{sqlcontent}"###); + "####).unwrap(); + } + + writeln!(&mut file, " + map + }});" + ).unwrap(); +} + fn main() { build_data::set_GIT_COMMIT_SHORT(); build_data::set_GIT_DIRTY(); @@ -51,4 +75,5 @@ fn main() { println!("cargo:rustc-env=SAMPLY_USER_AGENT=Samply.Focus.{}/{}", env!("CARGO_PKG_NAME"), version()); build_cqlmap(); + build_sqlmap(); } diff --git a/resources/sql/SELECT_TABLES b/resources/sql/SELECT_TABLES new file mode 100644 index 0000000..c59f3b3 --- /dev/null +++ b/resources/sql/SELECT_TABLES @@ -0,0 +1 @@ +SELECT * FROM pg_catalog.pg_tables \ No newline at end of file diff --git a/src/config.rs b/src/config.rs index e736fba..4a69eeb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -21,6 +21,8 @@ pub enum Obfuscate { pub enum EndpointType { Blaze, Omop, + BlazeAndSql, + Sql, } impl fmt::Display for EndpointType { @@ -28,6 +30,8 @@ impl fmt::Display for EndpointType { match self { EndpointType::Blaze => write!(f, "blaze"), EndpointType::Omop => write!(f, "omop"), + EndpointType::BlazeAndSql => write!(f, "blaze_sql"), + EndpointType::Sql => write!(f, "sql"), } } } @@ -151,6 +155,10 @@ struct CliArgs { #[clap(long, env, value_parser)] auth_header: Option, + /// Database connection string + #[clap(long, env, value_parser)] + db_connection_string: Option, + } pub(crate) struct Config { @@ -178,6 +186,7 @@ pub(crate) struct Config { pub provider: Option, pub provider_icon: Option, pub auth_header: Option, + pub db_connection_string: Option, } impl Config { @@ -219,6 +228,7 @@ impl Config { provider: cli_args.provider, provider_icon: cli_args.provider_icon, auth_header: cli_args.auth_header, + db_connection_string: cli_args.db_connection_string, client, }; Ok(config) diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..4e77b6b --- /dev/null +++ b/src/db.rs @@ -0,0 +1,86 @@ +use sqlx::{postgres::PgPoolOptions, PgPool, postgres::PgRow}; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::collections::HashMap; +use tracing::{error, info}; +use crate::errors::FocusError; +use crate::util; + +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct SqlQuery { + pub payload: String, +} + +include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs")); + +pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result { + info!("Trying to establish a PostgreSQL connection pool"); + + let mut attempts = 0; + let mut err: Option = None; + + while attempts < num_attempts { + info!("Attempt to connect to PostgreSQL {} of {}", attempts + 1, num_attempts); + match PgPoolOptions::new() + .max_connections(10) + .connect(&pg_url) + .await + { + Ok(pg_con_pool) => { + info!("PostgreSQL connection successfull"); + return Ok(pg_con_pool) + }, + Err(e) => { + error!("Failed to connect to PostgreSQL. Attempt {} of {}: {}", attempts + 1, num_attempts, e); + err = Some(FocusError::CannotConnectToDatabase(e.to_string())); + } + } + attempts += 1; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + Err(err.unwrap_or_else(|| FocusError::CannotConnectToDatabase("Failed to connect to PostgreSQL".into()))) +} + +pub async fn healthcheck(pool: &PgPool) -> bool { + + let res = sqlx::query(include_str!("../resources/sql/SELECT_TABLES")) + .fetch_all(pool) + .await; + if let Ok(_) = res {true} else {false} +} + +pub async fn run_query(pool: &PgPool, query: &str) -> Result, FocusError> { + + sqlx::query(query) + .fetch_all(pool) + .await.map_err( FocusError::ErrorExecutingQuery) +} + +pub async fn process_sql_task(pool: &PgPool, encoded: &str) -> Result, FocusError>{ + let decoded = util::base64_decode(encoded)?; + let key = String::from_utf8(decoded).map_err(FocusError::ErrorConvertingToString)?; + let key = key.as_str(); + let sql_query = SQL_REPLACE_MAP.get(&(key.clone())); + if sql_query.is_none(){ + return Err(FocusError::QueryNotAllowed(key.into())); + } + let query = sql_query.unwrap(); + + run_query(pool, query).await + +} + + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + #[ignore] //TODO mock DB + async fn connect() { + let pool = get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1).await.unwrap(); + + assert!(healthcheck(&pool).await); + } +} + diff --git a/src/errors.rs b/src/errors.rs index 49acc63..4f430b7 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -60,6 +60,14 @@ pub enum FocusError { MissingExporterEndpoint, #[error("Missing Exporter Task Type")] MissingExporterTaskType, + #[error("Cannot connect to database: {0}")] + CannotConnectToDatabase(String), + #[error("Error executing query: {0}")] + ErrorExecutingQuery(sqlx::Error), + #[error("Error converting to string: {0}")] + ErrorConvertingToString(std::string::FromUtf8Error), + #[error("Query not allowed: {0}")] + QueryNotAllowed(String), } impl FocusError { diff --git a/src/main.rs b/src/main.rs index 1e4e423..ec9ca58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ mod task_processing; mod util; mod projects; mod exporter; +mod db; use base64::engine::general_purpose; @@ -21,13 +22,16 @@ use beam_lib::{TaskRequest, TaskResult}; use futures_util::future::BoxFuture; use futures_util::FutureExt; use laplace_rs::ObfCache; +use sqlx::PgPool; use tokio::sync::Mutex; + use crate::blaze::{parse_blaze_query_payload_ast, AstQuery}; use crate::config::EndpointType; use crate::util::{base64_decode, is_cql_tampered_with, obfuscate_counts_mr}; use crate::{config::CONFIG, errors::FocusError}; use blaze::CqlQuery; +use db::SqlQuery; use std::collections::HashMap; use std::ops::DerefMut; @@ -52,7 +56,7 @@ type BeamResult = TaskResult; #[serde(tag = "lang", rename_all = "lowercase")] enum Language { Cql(CqlQuery), - Ast(AstQuery) + Ast(AstQuery), } #[derive(Debug, Deserialize, Serialize, Clone)] @@ -116,9 +120,22 @@ pub async fn main() -> ExitCode { } async fn main_loop() -> ExitCode { + let db_pool = if let Some(connection_string) = CONFIG.db_connection_string.clone() { + match db::get_pg_connection_pool(&connection_string, 8).await { + Err(e) => { + error!("Error connecting to database: {}, {}", connection_string, e); + return ExitCode::from(8); + }, + Ok(pool) => Some(pool), + } + } else { + None + }; let endpoint_service_available: fn() -> BoxFuture<'static, bool> = match CONFIG.endpoint_type { EndpointType::Blaze => || blaze::check_availability().boxed(), EndpointType::Omop => || async { true }.boxed(), // TODO health check + EndpointType::BlazeAndSql => || blaze::check_availability().boxed(), //TODO SQL health check + EndpointType::Sql => || async { true }.boxed(), // TODO health check }; let mut failures = 0; while !(beam::check_availability().await && endpoint_service_available().await) { @@ -144,12 +161,13 @@ async fn main_loop() -> ExitCode { task_processing::process_tasks(move |task| { let obf_cache = obf_cache.clone(); let report_cache = report_cache.clone(); - process_task(task, obf_cache, report_cache).boxed_local() + process_task(db_pool.clone(), task, obf_cache, report_cache).boxed_local() }).await; ExitCode::FAILURE } async fn process_task( + db_pool: Option, task: &BeamTask, obf_cache: Arc>, report_cache: Arc>, @@ -189,6 +207,37 @@ async fn process_task( }; run_cql_query(task, &query, obf_cache, report_cache, metadata.project, generated_from_ast).await + } else if CONFIG.endpoint_type == EndpointType::BlazeAndSql { + let mut generated_from_ast: bool = false; + let data = base64_decode(&task.body)?; + let query_maybe: Result = serde_json::from_slice(&(data.clone())); + if let Ok(sql_query) = query_maybe { + if let Some(pool) = db_pool{ + let result = db::process_sql_task(&pool, &(sql_query.payload)).await; + if let Ok(rows) = result { + + Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.clone().from], + task.id, + "".into(), + )) + } else {return Err(FocusError::CannotConnectToDatabase("SQL task but no connection String in config".into()));} + } + else { + return Err(FocusError::CannotConnectToDatabase("SQL task but no connection String in config".into())); + } + } else { + + let query: CqlQuery = match serde_json::from_slice::(&data)? { + Language::Cql(cql_query) => cql_query, + Language::Ast(ast_query) => { + generated_from_ast = true; + serde_json::from_str(&cql::generate_body(parse_blaze_query_payload_ast(&ast_query.payload)?)?)? + } + }; + run_cql_query(task, &query, obf_cache, report_cache, metadata.project, generated_from_ast).await + } } else if CONFIG.endpoint_type == EndpointType::Omop { let decoded = util::base64_decode(&task.body)?; let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = @@ -437,4 +486,5 @@ mod test { assert_eq!(metadata.task_type, Some(exporter::TaskType::Execute)); } -} \ No newline at end of file +} + From 4400fa1a0cc1b39cf25c9c1e8dffcab95afe925e Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 17 Jul 2024 19:25:12 +0200 Subject: [PATCH 02/36] serialization of DB result --- Cargo.toml | 3 +- README.md | 31 ++++++----- src/config.rs | 26 ++++----- src/db.rs | 102 ++++++++++++++++++++++++----------- src/errors.rs | 2 + src/main.rs | 143 +++++++++++++++++++++++++++++++++++--------------- 6 files changed, 209 insertions(+), 98 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 85c1200..dd59e6a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,7 +21,8 @@ laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" } uuid = "1.8.0" rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } -sqlx = { version = "0.7", features = [ "runtime-tokio", "postgres", "macros"] } +sqlx = { version = "0.7.4", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } +sqlx-pgrow-serde = "0.2.0" # Logging tracing = { version = "0.1.37", default_features = false } diff --git a/README.md b/README.md index e35f1f7..e004c39 100644 --- a/README.md +++ b/README.md @@ -34,20 +34,20 @@ BEAM_APP_ID_LONG = "app1.broker.example.com" ### Optional variables ```bash -RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks, default value: 32 -ENDPOINT_TYPE = "blaze" # Type of the endpoint, allowed values: "blaze", "omop", default value: "blaze" +RETRY_COUNT = "32" # The maximum number of retries for beam and blaze healthchecks; default value: 32 +ENDPOINT_TYPE = "blaze" # Type of the endpoint, allowed values: "blaze", "omop", "sql", "blaze-and-sql"; default value: "blaze" EXPORTER_URL = " https://exporter.site/" # The exporter URL -OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no", default value: "yes" -OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no", default value: 1 -DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no", default value: 1 -DELTA_SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no", default value: 20 -DELTA_DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no", default value: 3 -DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in the Procedures stratifier, has no effect if OBFUSCATE = "no", default value: 1.7 -DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no", default value: 2.1 -DELTA_HISTO = "20." # Sensitivity parameter for obfuscating the counts in the Histo stratifier, has no effect if OBFUSCATE = "no", default value: 20 -EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no", default value: 0.1 -ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no", default value: 10 -PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ;, default value: "exliquid;dktk_supervisors;exporter;ehds2" +OBFUSCATE = "yes" # Should the results be obfuscated - the "master switch", allowed values: "yes", "no"; default value: "yes" +OBFUSCATE_BELOW_10_MODE = "1" # The mode of obfuscating values below 10: 0 - return zero, 1 - return ten, 2 - obfuscate using Laplace distribution and rounding, has no effect if OBFUSCATE = "no"; default value: 1 +DELTA_PATIENT = "1." # Sensitivity parameter for obfuscating the counts in the Patient stratifier, has no effect if OBFUSCATE = "no"; default value: 1 +DELTA_SPECIMEN = "20." # Sensitivity parameter for obfuscating the counts in the Specimen stratifier, has no effect if OBFUSCATE = "no"; default value: 20 +DELTA_DIAGNOSIS = "3." # Sensitivity parameter for obfuscating the counts in the Diagnosis stratifier, has no effect if OBFUSCATE = "no"; default value: 3 +DELTA_PROCEDURES = "1.7" # Sensitivity parameter for obfuscating the counts in the Procedures stratifier, has no effect if OBFUSCATE = "no"; default value: 1.7 +DELTA_MEDICATION_STATEMENTS = "2.1" # Sensitivity parameter for obfuscating the counts in the Medication Statements stratifier, has no effect if OBFUSCATE = "no"; default value: 2.1 +DELTA_HISTO = "20." # Sensitivity parameter for obfuscating the counts in the Histo stratifier, has no effect if OBFUSCATE = "no"; default value: 20 +EPSILON = "0.1" # Privacy budget parameter for obfuscating the counts in the stratifiers, has no effect if OBFUSCATE = "no"; default value: 0.1 +ROUNDING_STEP = "10" # The granularity of the rounding of the obfuscated values, has no effect if OBFUSCATE = "no"; default value: 10 +PROJECTS_NO_OBFUSCATION = "exliquid;dktk_supervisors;exporter;ehds2" # Projects for which the results are not to be obfuscated, separated by ";" ; default value: "exliquid;dktk_supervisors;exporter;ehds2" QUERIES_TO_CACHE = "queries_to_cache.conf" # The path to a file containing base64 encoded queries whose results are to be cached. If not set, no results are cached PROVIDER = "name" #EUCAIM provider name PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded EUCAIM provider icon @@ -81,6 +81,11 @@ Creating a sample task containing an abstract syntax tree (AST) query using curl curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"bbmri"},"body":"eyJsYW5nIjoiYXN0IiwicGF5bG9hZCI6ImV5SmhjM1FpT25zaWIzQmxjbUZ1WkNJNklrOVNJaXdpWTJocGJHUnlaVzRpT2x0N0ltOXdaWEpoYm1RaU9pSkJUa1FpTENKamFHbHNaSEpsYmlJNlczc2liM0JsY21GdVpDSTZJazlTSWl3aVkyaHBiR1J5Wlc0aU9sdDdJbXRsZVNJNkltZGxibVJsY2lJc0luUjVjR1VpT2lKRlVWVkJURk1pTENKemVYTjBaVzBpT2lJaUxDSjJZV3gxWlNJNkltMWhiR1VpZlN4N0ltdGxlU0k2SW1kbGJtUmxjaUlzSW5SNWNHVWlPaUpGVVZWQlRGTWlMQ0p6ZVhOMFpXMGlPaUlpTENKMllXeDFaU0k2SW1abGJXRnNaU0o5WFgxZGZWMTlMQ0pwWkNJNkltRTJaakZqWTJZekxXVmlaakV0TkRJMFppMDVaRFk1TFRSbE5XUXhNelZtTWpNME1DSjkifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` +Creating a sample SQL task using curl: +```bash + curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RBQkxFUyJ9"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks + ``` + Creating a sample [Exporter](https://github.com/samply/exporter) "execute" task containing an Exporter query using curl: ```bash diff --git a/src/config.rs b/src/config.rs index 4a69eeb..ce28ab9 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,5 @@ -use std::path::PathBuf; use std::fmt; +use std::path::PathBuf; use beam_lib::AppId; use clap::Parser; @@ -10,7 +10,6 @@ use tracing::{debug, info, warn}; use crate::errors::FocusError; - #[derive(clap::ValueEnum, Clone, PartialEq, Debug)] pub enum Obfuscate { No, @@ -28,15 +27,14 @@ pub enum EndpointType { impl fmt::Display for EndpointType { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - EndpointType::Blaze => write!(f, "blaze"), + EndpointType::Blaze => write!(f, "blaze"), EndpointType::Omop => write!(f, "omop"), - EndpointType::BlazeAndSql => write!(f, "blaze_sql"), + EndpointType::BlazeAndSql => write!(f, "blaze_and_sql"), EndpointType::Sql => write!(f, "sql"), } } } - pub(crate) static CONFIG: Lazy = Lazy::new(|| { debug!("Loading config"); Config::load().unwrap_or_else(|e| { @@ -132,7 +130,12 @@ struct CliArgs { rounding_step: usize, /// Projects for which the results are not to be obfuscated, separated by ; - #[clap(long, env, value_parser, default_value = "exliquid;dktk_supervisors;exporter;ehds2")] + #[clap( + long, + env, + value_parser, + default_value = "exliquid;dktk_supervisors;exporter;ehds2" + )] projects_no_obfuscation: String, /// Path to a file containing BASE64 encoded queries whose results are to be cached @@ -146,7 +149,7 @@ struct CliArgs { /// OMOP provider name #[clap(long, env, value_parser)] provider: Option, - + /// Base64 encoded OMOP provider icon #[clap(long, env, value_parser)] provider_icon: Option, @@ -155,10 +158,9 @@ struct CliArgs { #[clap(long, env, value_parser)] auth_header: Option, - /// Database connection string - #[clap(long, env, value_parser)] - db_connection_string: Option, - + /// Database connection string + #[clap(long, env, value_parser)] + db_connection_string: Option, } pub(crate) struct Config { @@ -284,7 +286,7 @@ pub fn prepare_reqwest_client(certs: &Vec) -> Result proxies.push( Proxy::all(v) - .map_err( FocusError::InvalidProxyConfig)? + .map_err(FocusError::InvalidProxyConfig)? .no_proxy(no_proxy.clone()), ), _ => (), diff --git a/src/db.rs b/src/db.rs index 4e77b6b..c82f557 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,10 +1,10 @@ -use sqlx::{postgres::PgPoolOptions, PgPool, postgres::PgRow}; +use crate::errors::FocusError; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; +use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool}; +use sqlx_pgrow_serde::SerMapPgRow; use std::collections::HashMap; -use tracing::{error, info}; -use crate::errors::FocusError; -use crate::util; +use tracing::{error, info, debug}; #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct SqlQuery { @@ -15,12 +15,16 @@ include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs")); pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result { info!("Trying to establish a PostgreSQL connection pool"); - + let mut attempts = 0; let mut err: Option = None; - + while attempts < num_attempts { - info!("Attempt to connect to PostgreSQL {} of {}", attempts + 1, num_attempts); + info!( + "Attempt to connect to PostgreSQL {} of {}", + attempts + 1, + num_attempts + ); match PgPoolOptions::new() .max_connections(10) .connect(&pg_url) @@ -28,59 +32,97 @@ pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result

{ info!("PostgreSQL connection successfull"); - return Ok(pg_con_pool) - }, + return Ok(pg_con_pool); + } Err(e) => { - error!("Failed to connect to PostgreSQL. Attempt {} of {}: {}", attempts + 1, num_attempts, e); + error!( + "Failed to connect to PostgreSQL. Attempt {} of {}: {}", + attempts + 1, + num_attempts, + e + ); err = Some(FocusError::CannotConnectToDatabase(e.to_string())); } } attempts += 1; tokio::time::sleep(std::time::Duration::from_secs(1)).await; } - Err(err.unwrap_or_else(|| FocusError::CannotConnectToDatabase("Failed to connect to PostgreSQL".into()))) + Err(err.unwrap_or_else(|| { + FocusError::CannotConnectToDatabase("Failed to connect to PostgreSQL".into()) + })) } pub async fn healthcheck(pool: &PgPool) -> bool { - - let res = sqlx::query(include_str!("../resources/sql/SELECT_TABLES")) - .fetch_all(pool) - .await; - if let Ok(_) = res {true} else {false} + let res = run_query(pool, SQL_REPLACE_MAP.get("SELECT_TABLES").unwrap()).await; //this file exists, safe to unwrap + if let Ok(_) = res { + true + } else { + false + } } pub async fn run_query(pool: &PgPool, query: &str) -> Result, FocusError> { - sqlx::query(query) .fetch_all(pool) - .await.map_err( FocusError::ErrorExecutingQuery) + .await + .map_err(FocusError::ErrorExecutingQuery) } -pub async fn process_sql_task(pool: &PgPool, encoded: &str) -> Result, FocusError>{ - let decoded = util::base64_decode(encoded)?; - let key = String::from_utf8(decoded).map_err(FocusError::ErrorConvertingToString)?; - let key = key.as_str(); - let sql_query = SQL_REPLACE_MAP.get(&(key.clone())); - if sql_query.is_none(){ +pub async fn process_sql_task(pool: &PgPool, key: &str) -> Result, FocusError> { + debug!("Executing query with key = {}", &key); + let sql_query = SQL_REPLACE_MAP.get(&key); + if sql_query.is_none() { return Err(FocusError::QueryNotAllowed(key.into())); } - let query = sql_query.unwrap(); + let query = sql_query.unwrap(); + debug!("Executing query {}", &query); run_query(pool, query).await - } +pub fn serialize_rows(rows: Vec) -> Result { + let mut rows_json: Vec = vec![]; + + for row in rows { + let row = SerMapPgRow::from(row); + let row_json = serde_json::to_value(&row)?; + rows_json.push(row_json); + } + + Ok(json!(rows_json)) +} -#[cfg(test)] +#[cfg(test)] mod test { use super::*; #[tokio::test] #[ignore] //TODO mock DB async fn connect() { - let pool = get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1).await.unwrap(); - + let pool = + get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) + .await + .unwrap(); + assert!(healthcheck(&pool).await); } -} + #[tokio::test] + #[ignore] //TODO mock DB + async fn serialize() { + let pool = + get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) + .await + .unwrap(); + + let rows = run_query(&pool, SQL_REPLACE_MAP.get("SELECT_TABLES").unwrap()) + .await + .unwrap(); + + let rows_json = serialize_rows(rows).unwrap(); + + assert!(rows_json.is_array()); + + assert_ne!(rows_json[0]["hasindexes"], Value::Null); + } +} diff --git a/src/errors.rs b/src/errors.rs index 4f430b7..f3b8a2b 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -64,6 +64,8 @@ pub enum FocusError { CannotConnectToDatabase(String), #[error("Error executing query: {0}")] ErrorExecutingQuery(sqlx::Error), + #[error("QueryResultBad: {0}")] + QueryResultBad(String), #[error("Error converting to string: {0}")] ErrorConvertingToString(std::string::FromUtf8Error), #[error("Query not allowed: {0}")] diff --git a/src/main.rs b/src/main.rs index 58f108a..7803615 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,13 +8,12 @@ mod errors; mod graceful_shutdown; mod logger; +mod db; +mod exporter; mod intermediate_rep; +mod projects; mod task_processing; mod util; -mod projects; -mod exporter; -mod db; - use base64::engine::general_purpose; use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; @@ -25,13 +24,11 @@ use laplace_rs::ObfCache; use sqlx::PgPool; use tokio::sync::Mutex; - use crate::blaze::{parse_blaze_query_payload_ast, AstQuery}; use crate::config::EndpointType; use crate::util::{base64_decode, is_cql_tampered_with, obfuscate_counts_mr}; use crate::{config::CONFIG, errors::FocusError}; use blaze::CqlQuery; -use db::SqlQuery; use std::collections::HashMap; use std::ops::DerefMut; @@ -42,7 +39,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{process::exit, time::Duration}; use serde::{Deserialize, Serialize}; -use tracing::{debug, error, warn, trace}; +use tracing::{debug, error, trace, warn}; // result cache type SearchQuery = String; @@ -125,9 +122,9 @@ async fn main_loop() -> ExitCode { let db_pool = if let Some(connection_string) = CONFIG.db_connection_string.clone() { match db::get_pg_connection_pool(&connection_string, 8).await { Err(e) => { - error!("Error connecting to database: {}, {}", connection_string, e); + error!("Error connecting to database: {}", e); return ExitCode::from(8); - }, + } Ok(pool) => Some(pool), } } else { @@ -137,7 +134,7 @@ async fn main_loop() -> ExitCode { EndpointType::Blaze => || blaze::check_availability().boxed(), EndpointType::Omop => || async { true }.boxed(), // TODO health check EndpointType::BlazeAndSql => || blaze::check_availability().boxed(), //TODO SQL health check - EndpointType::Sql => || async { true }.boxed(), // TODO health check + EndpointType::Sql => || async { true }.boxed(), // TODO health check }; let mut failures = 0; while !(beam::check_availability().await && endpoint_service_available().await) { @@ -152,10 +149,9 @@ async fn main_loop() -> ExitCode { tokio::time::sleep(Duration::from_secs(2)).await; warn!( "Retrying connection (attempt {}/{})", - failures, - CONFIG.retry_count + failures, CONFIG.retry_count ); - }; + } let report_cache = Arc::new(Mutex::new(ReportCache::new())); let obf_cache = Arc::new(Mutex::new(ObfCache { cache: Default::default(), @@ -164,7 +160,8 @@ async fn main_loop() -> ExitCode { let obf_cache = obf_cache.clone(); let report_cache = report_cache.clone(); process_task(db_pool.clone(), task, obf_cache, report_cache).boxed_local() - }).await; + }) + .await; ExitCode::FAILURE } @@ -178,7 +175,7 @@ async fn process_task( let metadata: Metadata = serde_json::from_value(task.metadata.clone()).unwrap_or(Metadata { project: "default_obfuscation".to_string(), - task_type: None + task_type: None, }); if metadata.project == "focus-healthcheck" { @@ -186,7 +183,7 @@ async fn process_task( CONFIG.beam_app_id_long.clone(), vec![task.from.clone()], task.id, - "healthy".into() + "healthy".into(), )); } if metadata.project == "exporter" { @@ -204,41 +201,107 @@ async fn process_task( Language::Cql(cql_query) => cql_query, Language::Ast(ast_query) => { generated_from_ast = true; - serde_json::from_str(&cql::generate_body(parse_blaze_query_payload_ast(&ast_query.payload)?)?)? + serde_json::from_str(&cql::generate_body(parse_blaze_query_payload_ast( + &ast_query.payload, + )?)?)? } }; - run_cql_query(task, &query, obf_cache, report_cache, metadata.project, generated_from_ast).await - - } else if CONFIG.endpoint_type == EndpointType::BlazeAndSql { + run_cql_query( + task, + &query, + obf_cache, + report_cache, + metadata.project, + generated_from_ast, + ) + .await + } else if CONFIG.endpoint_type == EndpointType::BlazeAndSql { let mut generated_from_ast: bool = false; let data = base64_decode(&task.body)?; - let query_maybe: Result = serde_json::from_slice(&(data.clone())); + let query_maybe: Result = + serde_json::from_slice(&(data.clone())); if let Ok(sql_query) = query_maybe { - if let Some(pool) = db_pool{ + if let Some(pool) = db_pool { let result = db::process_sql_task(&pool, &(sql_query.payload)).await; if let Ok(rows) = result { - + let rows_json = db::serialize_rows(rows)?; + trace!("result: {}", &rows_json); + Ok(beam::beam_result::succeeded( CONFIG.beam_app_id_long.clone(), vec![task.clone().from], task.id, - "".into(), + BASE64.encode(rows_json.to_string()), )) - } else {return Err(FocusError::CannotConnectToDatabase("SQL task but no connection String in config".into()));} - } - else { - return Err(FocusError::CannotConnectToDatabase("SQL task but no connection String in config".into())); + } else { + let error = result.err().unwrap(); + error!("Error executing query: {}", error); + return Err(error); + } + } else { + return Err(FocusError::CannotConnectToDatabase( + "SQL task but no connection String in config".into(), + )); } } else { - let query: CqlQuery = match serde_json::from_slice::(&data)? { Language::Cql(cql_query) => cql_query, Language::Ast(ast_query) => { generated_from_ast = true; - serde_json::from_str(&cql::generate_body(parse_blaze_query_payload_ast(&ast_query.payload)?)?)? + serde_json::from_str(&cql::generate_body(parse_blaze_query_payload_ast( + &ast_query.payload, + )?)?)? } }; - run_cql_query(task, &query, obf_cache, report_cache, metadata.project, generated_from_ast).await + run_cql_query( + task, + &query, + obf_cache, + report_cache, + metadata.project, + generated_from_ast, + ) + .await + } + } else if CONFIG.endpoint_type == EndpointType::Sql { + let data = base64_decode(&task.body)?; + let query_maybe: Result = serde_json::from_slice(&(data)); + if let Ok(sql_query) = query_maybe { + if let Some(pool) = db_pool { + let result = db::process_sql_task(&pool, &(sql_query.payload)).await; + if let Ok(rows) = result { + let rows_json = db::serialize_rows(rows)?; + + Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.clone().from], + task.id, + BASE64.encode(rows_json.to_string()), + )) + } else { + return Err(FocusError::QueryResultBad( + "Query executed but result not readable".into(), + )); + } + } else { + return Err(FocusError::CannotConnectToDatabase( + "SQL task but no connection String in config".into(), + )); + } + } else { + warn!( + "Wrong type of query for an SQL only store: {}, {:?}", + CONFIG.endpoint_type, data + ); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Wrong type of query for an SQL only store: {}, {:?}", + CONFIG.endpoint_type, data + ), + )) } } else if CONFIG.endpoint_type == EndpointType::Omop { let decoded = util::base64_decode(&task.body)?; @@ -248,8 +311,7 @@ async fn process_task( let query_decoded = general_purpose::STANDARD .decode(intermediate_rep_query.query) .map_err(FocusError::DecodeError)?; - let ast: ast::Ast = - serde_json::from_slice(&query_decoded)?; + let ast: ast::Ast = serde_json::from_slice(&query_decoded)?; Ok(run_intermediate_rep_query(task, ast).await?) } else { @@ -275,7 +337,7 @@ async fn run_cql_query( obf_cache: Arc>, report_cache: Arc>, project: String, - generated_from_ast: bool + generated_from_ast: bool, ) -> Result { let encoded_query = query.lib["content"][0]["data"] @@ -310,9 +372,8 @@ async fn run_cql_query( let cql_result_new = match report_from_cache { Some(some_report_from_cache) => some_report_from_cache.to_string(), None => { - let query = - if generated_from_ast { - query.clone() + let query = if generated_from_ast { + query.clone() } else { replace_cql_library(query.clone())? }; @@ -466,7 +527,6 @@ fn beam_result(task: BeamTask, measure_report: String) -> Result Date: Wed, 17 Jul 2024 19:36:51 +0200 Subject: [PATCH 03/36] style --- src/db.rs | 10 +++------- src/errors.rs | 2 -- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/src/db.rs b/src/db.rs index c82f557..1025597 100644 --- a/src/db.rs +++ b/src/db.rs @@ -27,7 +27,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result

{ @@ -54,11 +54,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result

bool { let res = run_query(pool, SQL_REPLACE_MAP.get("SELECT_TABLES").unwrap()).await; //this file exists, safe to unwrap - if let Ok(_) = res { - true - } else { - false - } + res.is_ok() } pub async fn run_query(pool: &PgPool, query: &str) -> Result, FocusError> { @@ -98,7 +94,7 @@ mod test { #[tokio::test] #[ignore] //TODO mock DB - async fn connect() { + async fn connect_healthcheck() { let pool = get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) .await diff --git a/src/errors.rs b/src/errors.rs index f3b8a2b..d01059d 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -66,8 +66,6 @@ pub enum FocusError { ErrorExecutingQuery(sqlx::Error), #[error("QueryResultBad: {0}")] QueryResultBad(String), - #[error("Error converting to string: {0}")] - ErrorConvertingToString(std::string::FromUtf8Error), #[error("Query not allowed: {0}")] QueryNotAllowed(String), } From 7144502273f22d9511a8df049d522ef108d9c2b9 Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Thu, 18 Jul 2024 14:07:28 +0200 Subject: [PATCH 04/36] Update README.md Co-authored-by: Tobias Kussel --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index e004c39..d8e56be 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ Creating a sample task containing an abstract syntax tree (AST) query using curl curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"bbmri"},"body":"eyJsYW5nIjoiYXN0IiwicGF5bG9hZCI6ImV5SmhjM1FpT25zaWIzQmxjbUZ1WkNJNklrOVNJaXdpWTJocGJHUnlaVzRpT2x0N0ltOXdaWEpoYm1RaU9pSkJUa1FpTENKamFHbHNaSEpsYmlJNlczc2liM0JsY21GdVpDSTZJazlTSWl3aVkyaHBiR1J5Wlc0aU9sdDdJbXRsZVNJNkltZGxibVJsY2lJc0luUjVjR1VpT2lKRlVWVkJURk1pTENKemVYTjBaVzBpT2lJaUxDSjJZV3gxWlNJNkltMWhiR1VpZlN4N0ltdGxlU0k2SW1kbGJtUmxjaUlzSW5SNWNHVWlPaUpGVVZWQlRGTWlMQ0p6ZVhOMFpXMGlPaUlpTENKMllXeDFaU0k2SW1abGJXRnNaU0o5WFgxZGZWMTlMQ0pwWkNJNkltRTJaakZqWTJZekxXVmlaakV0TkRJMFppMDVaRFk1TFRSbE5XUXhNelZtTWpNME1DSjkifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` -Creating a sample SQL task using curl: +Creating a sample SQL task for a `SELECT_TABLES` query using curl: ```bash curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RBQkxFUyJ9"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` From 08ce472621ca28cd4db2a40f5b0eee079c1762dd Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Thu, 18 Jul 2024 14:53:10 +0200 Subject: [PATCH 05/36] Update src/db.rs Co-authored-by: Jan <59206115+Threated@users.noreply.github.com> --- src/db.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/db.rs b/src/db.rs index 1025597..56f875a 100644 --- a/src/db.rs +++ b/src/db.rs @@ -67,10 +67,9 @@ pub async fn run_query(pool: &PgPool, query: &str) -> Result, FocusEr pub async fn process_sql_task(pool: &PgPool, key: &str) -> Result, FocusError> { debug!("Executing query with key = {}", &key); let sql_query = SQL_REPLACE_MAP.get(&key); - if sql_query.is_none() { + let Some(query) = sql_query else { return Err(FocusError::QueryNotAllowed(key.into())); - } - let query = sql_query.unwrap(); + }; debug!("Executing query {}", &query); run_query(pool, query).await From a27293c4d78867d31e4845a811474fd0a6193ddc Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Thu, 18 Jul 2024 14:53:55 +0200 Subject: [PATCH 06/36] Update src/main.rs Co-authored-by: Jan <59206115+Threated@users.noreply.github.com> --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 7803615..5a07d95 100644 --- a/src/main.rs +++ b/src/main.rs @@ -229,7 +229,7 @@ async fn process_task( Ok(beam::beam_result::succeeded( CONFIG.beam_app_id_long.clone(), - vec![task.clone().from], + vec![task.from.clone()], task.id, BASE64.encode(rows_json.to_string()), )) From 5c0ed700356a4d9a033af3dfc757ded8b664d989 Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Fri, 19 Jul 2024 15:38:55 +0200 Subject: [PATCH 07/36] Update src/db.rs Co-authored-by: Jan <59206115+Threated@users.noreply.github.com> --- src/db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.rs b/src/db.rs index 56f875a..334eb69 100644 --- a/src/db.rs +++ b/src/db.rs @@ -76,7 +76,7 @@ pub async fn process_sql_task(pool: &PgPool, key: &str) -> Result, Fo } pub fn serialize_rows(rows: Vec) -> Result { - let mut rows_json: Vec = vec![]; + let mut rows_json: Vec = Vec::with_capacity(rows.len()); for row in rows { let row = SerMapPgRow::from(row); From 9fd8a9d105c3e04e7b4c9db25d4d90efc3ceece9 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 15:39:52 +0200 Subject: [PATCH 08/36] requested changes --- src/db.rs | 12 +++++------- src/main.rs | 4 ++-- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/db.rs b/src/db.rs index 1025597..32a3f5b 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,7 +4,7 @@ use serde_json::{json, Value}; use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool}; use sqlx_pgrow_serde::SerMapPgRow; use std::collections::HashMap; -use tracing::{error, info, debug}; +use tracing::{warn, info, debug}; #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct SqlQuery { @@ -35,7 +35,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result

{ - error!( + warn!( "Failed to connect to PostgreSQL. Attempt {} of {}: {}", attempts + 1, num_attempts, @@ -47,9 +47,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result

bool { @@ -93,7 +91,7 @@ mod test { use super::*; #[tokio::test] - #[ignore] //TODO mock DB + //#[ignore] //TODO mock DB async fn connect_healthcheck() { let pool = get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) @@ -104,7 +102,7 @@ mod test { } #[tokio::test] - #[ignore] //TODO mock DB + //#[ignore] //TODO mock DB async fn serialize() { let pool = get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) diff --git a/src/main.rs b/src/main.rs index 7803615..214de68 100644 --- a/src/main.rs +++ b/src/main.rs @@ -231,7 +231,7 @@ async fn process_task( CONFIG.beam_app_id_long.clone(), vec![task.clone().from], task.id, - BASE64.encode(rows_json.to_string()), + BASE64.encode(serde_json::to_string(&rows_json)?), )) } else { let error = result.err().unwrap(); @@ -276,7 +276,7 @@ async fn process_task( CONFIG.beam_app_id_long.clone(), vec![task.clone().from], task.id, - BASE64.encode(rows_json.to_string()), + BASE64.encode(serde_json::to_string(&rows_json)?), )) } else { return Err(FocusError::QueryResultBad( From 410c5b1b7e2849af71d512c1a1785dd270e0b70c Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 15:47:17 +0200 Subject: [PATCH 09/36] ignoring tests --- src/db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db.rs b/src/db.rs index c5ec19d..843fea3 100644 --- a/src/db.rs +++ b/src/db.rs @@ -90,7 +90,7 @@ mod test { use super::*; #[tokio::test] - //#[ignore] //TODO mock DB + #[ignore] //TODO mock DB async fn connect_healthcheck() { let pool = get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) @@ -101,7 +101,7 @@ mod test { } #[tokio::test] - //#[ignore] //TODO mock DB + #[ignore] //TODO mock DB async fn serialize() { let pool = get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) From 69eb81b8afbff17e57b76731a150b9175f892538 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 15:51:09 +0200 Subject: [PATCH 10/36] requested changes --- src/db.rs | 2 +- src/main.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db.rs b/src/db.rs index 843fea3..2e474ec 100644 --- a/src/db.rs +++ b/src/db.rs @@ -82,7 +82,7 @@ pub fn serialize_rows(rows: Vec) -> Result { rows_json.push(row_json); } - Ok(json!(rows_json)) + Ok(Value::Array(rows_json)) } #[cfg(test)] diff --git a/src/main.rs b/src/main.rs index e6aee42..55b988a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -133,8 +133,8 @@ async fn main_loop() -> ExitCode { let endpoint_service_available: fn() -> BoxFuture<'static, bool> = match CONFIG.endpoint_type { EndpointType::Blaze => || blaze::check_availability().boxed(), EndpointType::Omop => || async { true }.boxed(), // TODO health check - EndpointType::BlazeAndSql => || blaze::check_availability().boxed(), //TODO SQL health check - EndpointType::Sql => || async { true }.boxed(), // TODO health check + EndpointType::BlazeAndSql => || blaze::check_availability().boxed(), + EndpointType::Sql => || async { true }.boxed(), }; let mut failures = 0; while !(beam::check_availability().await && endpoint_service_available().await) { From fe0cf673b9913810bd37401450bde11e4644ba58 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 15:53:26 +0200 Subject: [PATCH 11/36] renamed db to postgres --- src/config.rs | 6 +++--- src/main.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/config.rs b/src/config.rs index ce28ab9..1498943 100644 --- a/src/config.rs +++ b/src/config.rs @@ -160,7 +160,7 @@ struct CliArgs { /// Database connection string #[clap(long, env, value_parser)] - db_connection_string: Option, + postgres_connection_string: Option, } pub(crate) struct Config { @@ -188,7 +188,7 @@ pub(crate) struct Config { pub provider: Option, pub provider_icon: Option, pub auth_header: Option, - pub db_connection_string: Option, + pub postgres_connection_string: Option, } impl Config { @@ -230,7 +230,7 @@ impl Config { provider: cli_args.provider, provider_icon: cli_args.provider_icon, auth_header: cli_args.auth_header, - db_connection_string: cli_args.db_connection_string, + postgres_connection_string: cli_args.postgres_connection_string, client, }; Ok(config) diff --git a/src/main.rs b/src/main.rs index 55b988a..d99cb58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -119,7 +119,7 @@ pub async fn main() -> ExitCode { } async fn main_loop() -> ExitCode { - let db_pool = if let Some(connection_string) = CONFIG.db_connection_string.clone() { + let db_pool = if let Some(connection_string) = CONFIG.postgres_connection_string.clone() { match db::get_pg_connection_pool(&connection_string, 8).await { Err(e) => { error!("Error connecting to database: {}", e); From 3ebdbc28a79dd45e335918abd21345c1d950c177 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 15:54:27 +0200 Subject: [PATCH 12/36] renamed db to postgres in Readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d8e56be..1718f1a 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ QUERIES_TO_CACHE = "queries_to_cache.conf" # The path to a file containing base6 PROVIDER = "name" #EUCAIM provider name PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded EUCAIM provider icon AUTH_HEADER = "ApiKey XXXX" #Authorization header -DB_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Database connection string +POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Postgres connection string ``` Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`. From c71de6d6327be045de2da93b25a3d5ffebcf4d33 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 16:06:27 +0200 Subject: [PATCH 13/36] up the error --- src/main.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/main.rs b/src/main.rs index d99cb58..3cd2c57 100644 --- a/src/main.rs +++ b/src/main.rs @@ -222,8 +222,7 @@ async fn process_task( serde_json::from_slice(&(data.clone())); if let Ok(sql_query) = query_maybe { if let Some(pool) = db_pool { - let result = db::process_sql_task(&pool, &(sql_query.payload)).await; - if let Ok(rows) = result { + let rows = db::process_sql_task(&pool, &(sql_query.payload)).await?; let rows_json = db::serialize_rows(rows)?; trace!("result: {}", &rows_json); @@ -233,11 +232,6 @@ async fn process_task( task.id, BASE64.encode(serde_json::to_string(&rows_json)?), )) - } else { - let error = result.err().unwrap(); - error!("Error executing query: {}", error); - return Err(error); - } } else { return Err(FocusError::CannotConnectToDatabase( "SQL task but no connection String in config".into(), From 3dce3bf91e292dadfe5652e217a7cfa0002f112a Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 16:24:15 +0200 Subject: [PATCH 14/36] match endpoint type --- src/main.rs | 197 +++++++++++++++++++++++++--------------------------- 1 file changed, 94 insertions(+), 103 deletions(-) diff --git a/src/main.rs b/src/main.rs index 3cd2c57..f27e9f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -194,50 +194,10 @@ async fn process_task( return run_exporter_query(task, body, task_type).await; } - if CONFIG.endpoint_type == EndpointType::Blaze { - let mut generated_from_ast: bool = false; - let data = base64_decode(&task.body)?; - let query: CqlQuery = match serde_json::from_slice::(&data)? { - Language::Cql(cql_query) => cql_query, - Language::Ast(ast_query) => { - generated_from_ast = true; - serde_json::from_str(&cql::generate_body(parse_blaze_query_payload_ast( - &ast_query.payload, - )?)?)? - } - }; - run_cql_query( - task, - &query, - obf_cache, - report_cache, - metadata.project, - generated_from_ast, - ) - .await - } else if CONFIG.endpoint_type == EndpointType::BlazeAndSql { - let mut generated_from_ast: bool = false; - let data = base64_decode(&task.body)?; - let query_maybe: Result = - serde_json::from_slice(&(data.clone())); - if let Ok(sql_query) = query_maybe { - if let Some(pool) = db_pool { - let rows = db::process_sql_task(&pool, &(sql_query.payload)).await?; - let rows_json = db::serialize_rows(rows)?; - trace!("result: {}", &rows_json); - - Ok(beam::beam_result::succeeded( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - BASE64.encode(serde_json::to_string(&rows_json)?), - )) - } else { - return Err(FocusError::CannotConnectToDatabase( - "SQL task but no connection String in config".into(), - )); - } - } else { + match CONFIG.endpoint_type { + EndpointType::Blaze => { + let mut generated_from_ast: bool = false; + let data = base64_decode(&task.body)?; let query: CqlQuery = match serde_json::from_slice::(&data)? { Language::Cql(cql_query) => cql_query, Language::Ast(ast_query) => { @@ -256,72 +216,103 @@ async fn process_task( generated_from_ast, ) .await - } - } else if CONFIG.endpoint_type == EndpointType::Sql { - let data = base64_decode(&task.body)?; - let query_maybe: Result = serde_json::from_slice(&(data)); - if let Ok(sql_query) = query_maybe { - if let Some(pool) = db_pool { - let result = db::process_sql_task(&pool, &(sql_query.payload)).await; - if let Ok(rows) = result { - let rows_json = db::serialize_rows(rows)?; - - Ok(beam::beam_result::succeeded( - CONFIG.beam_app_id_long.clone(), - vec![task.clone().from], - task.id, - BASE64.encode(serde_json::to_string(&rows_json)?), - )) + }, + EndpointType::BlazeAndSql => { + let mut generated_from_ast: bool = false; + let data = base64_decode(&task.body)?; + let query_maybe: Result = + serde_json::from_slice(&(data.clone())); + if let Ok(sql_query) = query_maybe { + if let Some(pool) = db_pool { + let rows = db::process_sql_task(&pool, &(sql_query.payload)).await?; + let rows_json = db::serialize_rows(rows)?; + trace!("result: {}", &rows_json); + + Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + BASE64.encode(serde_json::to_string(&rows_json)?), + )) } else { - return Err(FocusError::QueryResultBad( - "Query executed but result not readable".into(), + return Err(FocusError::CannotConnectToDatabase( + "SQL task but no connection String in config".into(), )); } } else { - return Err(FocusError::CannotConnectToDatabase( - "SQL task but no connection String in config".into(), - )); + let query: CqlQuery = match serde_json::from_slice::(&data)? { + Language::Cql(cql_query) => cql_query, + Language::Ast(ast_query) => { + generated_from_ast = true; + serde_json::from_str(&cql::generate_body(parse_blaze_query_payload_ast( + &ast_query.payload, + )?)?)? + } + }; + run_cql_query( + task, + &query, + obf_cache, + report_cache, + metadata.project, + generated_from_ast, + ) + .await } - } else { - warn!( - "Wrong type of query for an SQL only store: {}, {:?}", - CONFIG.endpoint_type, data - ); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( + }, + EndpointType::Sql => { + let data = base64_decode(&task.body)?; + let query_maybe: Result = serde_json::from_slice(&(data)); + if let Ok(sql_query) = query_maybe { + if let Some(pool) = db_pool { + let result = db::process_sql_task(&pool, &(sql_query.payload)).await; + if let Ok(rows) = result { + let rows_json = db::serialize_rows(rows)?; + + Ok(beam::beam_result::succeeded( + CONFIG.beam_app_id_long.clone(), + vec![task.clone().from], + task.id, + BASE64.encode(serde_json::to_string(&rows_json)?), + )) + } else { + return Err(FocusError::QueryResultBad( + "Query executed but result not readable".into(), + )); + } + } else { + return Err(FocusError::CannotConnectToDatabase( + "SQL task but no connection String in config".into(), + )); + } + } else { + warn!( "Wrong type of query for an SQL only store: {}, {:?}", CONFIG.endpoint_type, data - ), - )) - } - } else if CONFIG.endpoint_type == EndpointType::Omop { - let decoded = util::base64_decode(&task.body)?; - let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = - serde_json::from_slice(&decoded)?; - //TODO check that the language is ast - let query_decoded = general_purpose::STANDARD - .decode(intermediate_rep_query.query) - .map_err(FocusError::DecodeError)?; - let ast: ast::Ast = serde_json::from_slice(&query_decoded)?; - - Ok(run_intermediate_rep_query(task, ast).await?) - } else { - warn!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ); - Ok(beam::beam_result::perm_failed( - CONFIG.beam_app_id_long.clone(), - vec![task.from.clone()], - task.id, - format!( - "Can't run queries with endpoint type {}", - CONFIG.endpoint_type - ), - )) + ); + Ok(beam::beam_result::perm_failed( + CONFIG.beam_app_id_long.clone(), + vec![task.from.clone()], + task.id, + format!( + "Wrong type of query for an SQL only store: {}, {:?}", + CONFIG.endpoint_type, data + ), + )) + } + }, + EndpointType::Omop => { + let decoded = util::base64_decode(&task.body)?; + let intermediate_rep_query: intermediate_rep::IntermediateRepQuery = + serde_json::from_slice(&decoded)?; + //TODO check that the language is ast + let query_decoded = general_purpose::STANDARD + .decode(intermediate_rep_query.query) + .map_err(FocusError::DecodeError)?; + let ast: ast::Ast = serde_json::from_slice(&query_decoded)?; + + Ok(run_intermediate_rep_query(task, ast).await?) + } } } From 8678af0c33badef5211a627885bd19bdcbaa5e6b Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Fri, 19 Jul 2024 16:29:47 +0200 Subject: [PATCH 15/36] process task first param task --- src/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main.rs b/src/main.rs index f27e9f0..2833788 100644 --- a/src/main.rs +++ b/src/main.rs @@ -159,17 +159,17 @@ async fn main_loop() -> ExitCode { task_processing::process_tasks(move |task| { let obf_cache = obf_cache.clone(); let report_cache = report_cache.clone(); - process_task(db_pool.clone(), task, obf_cache, report_cache).boxed_local() + process_task(task, obf_cache, report_cache, db_pool.clone()).boxed_local() }) .await; ExitCode::FAILURE } async fn process_task( - db_pool: Option, task: &BeamTask, obf_cache: Arc>, report_cache: Arc>, + db_pool: Option, ) -> Result { debug!("Processing task {}", task.id); From 7899eb105192ac07b352cd4db51cb757bc06e445 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 22 Jul 2024 14:55:48 +0200 Subject: [PATCH 16/36] pg connection tokio_retry exp backoff + jitter --- Cargo.toml | 2 ++ src/db.rs | 49 +++++++++++++++++++------------------------------ 2 files changed, 21 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dd59e6a..d801f60 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } sqlx = { version = "0.7.4", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } sqlx-pgrow-serde = "0.2.0" +tokio-retry = "0.3" # Logging tracing = { version = "0.1.37", default_features = false } @@ -34,6 +35,7 @@ once_cell = "1.18" # Command Line Interface clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] } + [features] default = [] bbmri = [] diff --git a/src/db.rs b/src/db.rs index 2e474ec..82642a3 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,10 +1,13 @@ use crate::errors::FocusError; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::Value; use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool}; use sqlx_pgrow_serde::SerMapPgRow; use std::collections::HashMap; use tracing::{warn, info, debug}; +use tokio_retry::strategy::{ExponentialBackoff, jitter}; +use tokio_retry::Retry; + #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct SqlQuery { @@ -13,43 +16,29 @@ pub struct SqlQuery { include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs")); -pub async fn get_pg_connection_pool(pg_url: &str, num_attempts: u32) -> Result { +pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result { info!("Trying to establish a PostgreSQL connection pool"); - let mut attempts = 0; - let mut err: Option = None; + let retry_strategy = ExponentialBackoff::from_millis(1000) + .map(jitter) + .take(max_attempts as usize); - while attempts < num_attempts { - info!( - "Attempt to connect to PostgreSQL {} of {}", - attempts + 1, - num_attempts - ); - match PgPoolOptions::new() + let result = Retry::spawn(retry_strategy, || async { + info!("Attempting to connect to PostgreSQL"); + PgPoolOptions::new() .max_connections(10) .connect(pg_url) .await - { - Ok(pg_con_pool) => { - info!("PostgreSQL connection successfull"); - return Ok(pg_con_pool); - } - Err(e) => { - warn!( - "Failed to connect to PostgreSQL. Attempt {} of {}: {}", - attempts + 1, - num_attempts, - e - ); - err = Some(FocusError::CannotConnectToDatabase(e.to_string())); - } - } - attempts += 1; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - Err(err.unwrap()) + .map_err(|e| { + warn!("Failed to connect to PostgreSQL: {}", e); + FocusError::CannotConnectToDatabase(e.to_string()) + }) + }).await; + + result } + pub async fn healthcheck(pool: &PgPool) -> bool { let res = run_query(pool, SQL_REPLACE_MAP.get("SELECT_TABLES").unwrap()).await; //this file exists, safe to unwrap res.is_ok() From f55cfbb902137d7be94e92218bd55ebff46ec768 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 22 Jul 2024 15:29:41 +0200 Subject: [PATCH 17/36] requested by clippy --- src/db.rs | 15 ++++++--------- src/main.rs | 12 ++++++------ 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/db.rs b/src/db.rs index 82642a3..6af683c 100644 --- a/src/db.rs +++ b/src/db.rs @@ -4,10 +4,9 @@ use serde_json::Value; use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool}; use sqlx_pgrow_serde::SerMapPgRow; use std::collections::HashMap; -use tracing::{warn, info, debug}; -use tokio_retry::strategy::{ExponentialBackoff, jitter}; +use tokio_retry::strategy::{jitter, ExponentialBackoff}; use tokio_retry::Retry; - +use tracing::{debug, info, warn}; #[derive(Serialize, Deserialize, Debug, Default, Clone)] pub struct SqlQuery { @@ -20,10 +19,10 @@ pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result

Result

bool { let res = run_query(pool, SQL_REPLACE_MAP.get("SELECT_TABLES").unwrap()).await; //this file exists, safe to unwrap res.is_ok() diff --git a/src/main.rs b/src/main.rs index 2833788..75528bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -235,9 +235,9 @@ async fn process_task( BASE64.encode(serde_json::to_string(&rows_json)?), )) } else { - return Err(FocusError::CannotConnectToDatabase( + Err(FocusError::CannotConnectToDatabase( "SQL task but no connection String in config".into(), - )); + )) } } else { let query: CqlQuery = match serde_json::from_slice::(&data)? { @@ -276,14 +276,14 @@ async fn process_task( BASE64.encode(serde_json::to_string(&rows_json)?), )) } else { - return Err(FocusError::QueryResultBad( + Err(FocusError::QueryResultBad( "Query executed but result not readable".into(), - )); + )) } } else { - return Err(FocusError::CannotConnectToDatabase( + Err(FocusError::CannotConnectToDatabase( "SQL task but no connection String in config".into(), - )); + )) } } else { warn!( From dff9045c35335c7c0a1d2b8fdde0603f10bb214f Mon Sep 17 00:00:00 2001 From: janskiba Date: Mon, 22 Jul 2024 13:30:21 +0000 Subject: [PATCH 18/36] chore: use `tryhard` as retry crate --- Cargo.toml | 2 +- src/db.rs | 17 ++++++----------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d801f60..1d109df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } sqlx = { version = "0.7.4", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } sqlx-pgrow-serde = "0.2.0" -tokio-retry = "0.3" +tryhard = "0.5" # Logging tracing = { version = "0.1.37", default_features = false } diff --git a/src/db.rs b/src/db.rs index 82642a3..80c2416 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,10 +3,8 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool}; use sqlx_pgrow_serde::SerMapPgRow; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use tracing::{warn, info, debug}; -use tokio_retry::strategy::{ExponentialBackoff, jitter}; -use tokio_retry::Retry; #[derive(Serialize, Deserialize, Debug, Default, Clone)] @@ -19,11 +17,7 @@ include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs")); pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result { info!("Trying to establish a PostgreSQL connection pool"); - let retry_strategy = ExponentialBackoff::from_millis(1000) - .map(jitter) - .take(max_attempts as usize); - - let result = Retry::spawn(retry_strategy, || async { + tryhard::retry_fn(|| async { info!("Attempting to connect to PostgreSQL"); PgPoolOptions::new() .max_connections(10) @@ -33,9 +27,10 @@ pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result

Date: Sun, 28 Jul 2024 09:44:44 +0200 Subject: [PATCH 19/36] chore: upgrade reqwest to 0.12 and remove direct deb of http (#159) --- Cargo.toml | 3 +-- src/beam.rs | 2 +- src/blaze.rs | 2 +- src/config.rs | 16 ++++++++-------- src/errors.rs | 3 ++- src/exporter.rs | 5 +---- src/intermediate_rep.rs | 5 +---- 7 files changed, 15 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1d109df..ee17b0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,8 +8,7 @@ license = "Apache-2.0" [dependencies] base64 = "0.22.1" -http = "0.2" -reqwest = { version = "0.11", default-features = false, features = ["json", "default-tls"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "default-tls"] } serde = { version = "1.0.152", features = ["serde_derive"] } serde_json = "1.0" thiserror = "1.0.38" diff --git a/src/beam.rs b/src/beam.rs index 9c2f3c6..45a640e 100644 --- a/src/beam.rs +++ b/src/beam.rs @@ -1,7 +1,7 @@ use std::time::Duration; use beam_lib::{TaskResult, BeamClient, BlockingOptions, MsgId, TaskRequest, RawString}; -use http::StatusCode; +use reqwest::StatusCode; use once_cell::sync::Lazy; use serde::Serialize; use tracing::{debug, warn, info}; diff --git a/src/blaze.rs b/src/blaze.rs index 02162e1..18aa63c 100644 --- a/src/blaze.rs +++ b/src/blaze.rs @@ -1,4 +1,4 @@ -use http::StatusCode; +use reqwest::StatusCode; use serde::Deserialize; use serde::Serialize; use serde_json::Value; diff --git a/src/config.rs b/src/config.rs index 1498943..7a09748 100644 --- a/src/config.rs +++ b/src/config.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; use beam_lib::AppId; use clap::Parser; -use http::{HeaderValue, Uri}; +use reqwest::{header::HeaderValue, Url}; use once_cell::sync::Lazy; use reqwest::{Certificate, Client, Proxy}; use tracing::{debug, info, warn}; @@ -55,7 +55,7 @@ const CLAP_FOOTER: &str = "For proxy support, environment variables HTTP_PROXY, struct CliArgs { /// The beam proxy's base URL, e.g. https://proxy1.beam.samply.de #[clap(long, env, value_parser)] - beam_proxy_url: Uri, + beam_proxy_url: Url, /// This application's beam AppId, e.g. focus.proxy1.broker.samply.de #[clap(long, env, value_parser)] @@ -71,15 +71,15 @@ struct CliArgs { /// The endpoint base URL, e.g. https://blaze.site/fhir/ #[clap(long, env, value_parser)] - endpoint_url: Option, + endpoint_url: Option, /// The endpoint base URL, e.g. https://blaze.site/fhir/, for the sake of backward compatibility, use endpoint_url instead #[clap(long, env, value_parser)] - blaze_url: Option, + blaze_url: Option, /// The exporter URL, e.g. https://exporter.site/ #[clap(long, env, value_parser)] - exporter_url: Option, + exporter_url: Option, /// Type of the endpoint, e.g. "blaze", "omop" #[clap(long, env, value_parser = clap::value_parser!(EndpointType), default_value = "blaze")] @@ -164,12 +164,12 @@ struct CliArgs { } pub(crate) struct Config { - pub beam_proxy_url: Uri, + pub beam_proxy_url: Url, pub beam_app_id_long: AppId, pub api_key: String, pub retry_count: usize, - pub endpoint_url: Uri, - pub exporter_url: Option, + pub endpoint_url: Url, + pub exporter_url: Option, pub endpoint_type: EndpointType, pub obfuscate: Obfuscate, pub obfuscate_zero: bool, diff --git a/src/errors.rs b/src/errors.rs index d01059d..8cccda1 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,3 +1,4 @@ +use reqwest::header; use thiserror::Error; #[derive(Error, Debug)] @@ -55,7 +56,7 @@ pub enum FocusError { #[error("Invalid date format: {0}")] AstInvalidDateFormat(String), #[error("Invalid Header Value: {0}")] - InvalidHeaderValue(http::header::InvalidHeaderValue), + InvalidHeaderValue(header::InvalidHeaderValue), #[error("Missing Exporter Endpoint")] MissingExporterEndpoint, #[error("Missing Exporter Task Type")] diff --git a/src/exporter.rs b/src/exporter.rs index fb18e68..c3eb932 100644 --- a/src/exporter.rs +++ b/src/exporter.rs @@ -1,7 +1,4 @@ -use http::header; -use http::HeaderMap; -use http::HeaderValue; -use http::StatusCode; +use reqwest::{header::{self, HeaderMap, HeaderValue}, StatusCode}; use serde::Deserialize; use serde::Serialize; use serde_json::Value; diff --git a/src/intermediate_rep.rs b/src/intermediate_rep.rs index 6ac1210..5026aa3 100644 --- a/src/intermediate_rep.rs +++ b/src/intermediate_rep.rs @@ -1,7 +1,4 @@ -use http::header; -use http::HeaderMap; -use http::HeaderValue; -use http::StatusCode; +use reqwest::{header::{self, HeaderMap, HeaderValue}, StatusCode}; use serde::Deserialize; use serde::Serialize; use tracing::{debug, warn}; From 8bee989cc2753e88eb8fbd6cd338fc0f1bd2f62a Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 4 Sep 2024 09:56:39 +0200 Subject: [PATCH 20/36] sqlx version 0.8.2 --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ee17b0b..ee76649 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,8 +20,8 @@ laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" } uuid = "1.8.0" rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } -sqlx = { version = "0.7.4", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } -sqlx-pgrow-serde = "0.2.0" +sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } +sqlx-pgrow-serde = { package = "kurtbuilds_sqlx_serde", git = "https://github.com/kurtbuilds/sqlx-pgrow-serde", branch = "master" } tryhard = "0.5" # Logging From 38aa87de77b567ed98da78aa6323aee02466c06b Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Wed, 4 Sep 2024 10:52:22 +0200 Subject: [PATCH 21/36] sql-pgrow-serde updated --- Cargo.toml | 3 ++- src/db.rs | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ee76649..b8648d4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,8 @@ uuid = "1.8.0" rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } -sqlx-pgrow-serde = { package = "kurtbuilds_sqlx_serde", git = "https://github.com/kurtbuilds/sqlx-pgrow-serde", branch = "master" } tryhard = "0.5" +kurtbuilds_sqlx_serde = "0.3.1" # Logging tracing = { version = "0.1.37", default_features = false } @@ -35,6 +35,7 @@ once_cell = "1.18" clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] } + [features] default = [] bbmri = [] diff --git a/src/db.rs b/src/db.rs index fad0553..48230d8 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,7 +2,7 @@ use crate::errors::FocusError; use serde::{Deserialize, Serialize}; use serde_json::Value; use sqlx::{postgres::PgPoolOptions, postgres::PgRow, PgPool}; -use sqlx_pgrow_serde::SerMapPgRow; +use sqlx_serde::SerMapPgRow; use std::{collections::HashMap, time::Duration}; use tracing::{warn, info, debug}; From 3c6e2043f7a8b77907091af34d4f9051073b49b6 Mon Sep 17 00:00:00 2001 From: Martin Lablans Date: Wed, 4 Sep 2024 11:32:09 +0200 Subject: [PATCH 22/36] Turn SQL into optional cargo feature --- Cargo.toml | 8 +++++--- src/config.rs | 4 ++++ src/errors.rs | 5 +++-- src/main.rs | 42 ++++++++++++++++++++++++++++++++++-------- 4 files changed, 46 insertions(+), 13 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b8648d4..d0d51f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,9 +20,7 @@ laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" } uuid = "1.8.0" rand = { default-features = false, version = "0.8.5" } futures-util = { version = "0.3", default-features = false, features = ["std"] } -sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"] } tryhard = "0.5" -kurtbuilds_sqlx_serde = "0.3.1" # Logging tracing = { version = "0.1.37", default_features = false } @@ -34,12 +32,16 @@ once_cell = "1.18" # Command Line Interface clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] } +# Query via SQL +sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"], optional = true } +kurtbuilds_sqlx_serde = { version = "0.3.1", optional = true } [features] default = [] bbmri = [] -dktk = [] +dktk = ["query-sql"] +query-sql = ["dep:sqlx", "dep:kurtbuilds_sqlx_serde"] [dev-dependencies] pretty_assertions = "1.4.0" diff --git a/src/config.rs b/src/config.rs index 7a09748..c388cd7 100644 --- a/src/config.rs +++ b/src/config.rs @@ -20,7 +20,9 @@ pub enum Obfuscate { pub enum EndpointType { Blaze, Omop, + #[cfg(feature = "query-sql")] BlazeAndSql, + #[cfg(feature = "query-sql")] Sql, } @@ -29,7 +31,9 @@ impl fmt::Display for EndpointType { match self { EndpointType::Blaze => write!(f, "blaze"), EndpointType::Omop => write!(f, "omop"), + #[cfg(feature = "query-sql")] EndpointType::BlazeAndSql => write!(f, "blaze_and_sql"), + #[cfg(feature = "query-sql")] EndpointType::Sql => write!(f, "sql"), } } diff --git a/src/errors.rs b/src/errors.rs index 8cccda1..28e4820 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -63,12 +63,13 @@ pub enum FocusError { MissingExporterTaskType, #[error("Cannot connect to database: {0}")] CannotConnectToDatabase(String), - #[error("Error executing query: {0}")] - ErrorExecutingQuery(sqlx::Error), #[error("QueryResultBad: {0}")] QueryResultBad(String), #[error("Query not allowed: {0}")] QueryNotAllowed(String), + #[cfg(feature = "query-sql")] + #[error("Error executing query: {0}")] + ErrorExecutingQuery(sqlx::Error), } impl FocusError { diff --git a/src/main.rs b/src/main.rs index 75528bb..77ef6da 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,20 +8,21 @@ mod errors; mod graceful_shutdown; mod logger; -mod db; mod exporter; mod intermediate_rep; mod projects; mod task_processing; mod util; +#[cfg(feature = "query-sql")] +mod db; + use base64::engine::general_purpose; use base64::{engine::general_purpose::STANDARD as BASE64, Engine as _}; use beam_lib::{TaskRequest, TaskResult}; use futures_util::future::BoxFuture; use futures_util::FutureExt; use laplace_rs::ObfCache; -use sqlx::PgPool; use tokio::sync::Mutex; use crate::blaze::{parse_blaze_query_payload_ast, AstQuery}; @@ -118,22 +119,45 @@ pub async fn main() -> ExitCode { } } -async fn main_loop() -> ExitCode { - let db_pool = if let Some(connection_string) = CONFIG.postgres_connection_string.clone() { +#[cfg(not(feature = "query-sql"))] +type DbPool = (); + +#[cfg(feature = "query-sql")] +type DbPool = sqlx::PgPool; + +#[cfg(not(feature = "query-sql"))] +async fn get_db_pool() -> Result,ExitCode> { + Ok(None) +} + +#[cfg(feature = "query-sql")] +async fn get_db_pool() -> Result,ExitCode> { + if let Some(connection_string) = CONFIG.postgres_connection_string.clone() { match db::get_pg_connection_pool(&connection_string, 8).await { Err(e) => { error!("Error connecting to database: {}", e); - return ExitCode::from(8); + Err(ExitCode::from(8)) } - Ok(pool) => Some(pool), + Ok(pool) => Ok(Some(pool)), } } else { - None + Ok(None) + } +} + +async fn main_loop() -> ExitCode { + let db_pool = match get_db_pool().await { + Ok(pool) => pool, + Err(code) => { + return code; + }, }; let endpoint_service_available: fn() -> BoxFuture<'static, bool> = match CONFIG.endpoint_type { EndpointType::Blaze => || blaze::check_availability().boxed(), EndpointType::Omop => || async { true }.boxed(), // TODO health check + #[cfg(feature = "query-sql")] EndpointType::BlazeAndSql => || blaze::check_availability().boxed(), + #[cfg(feature = "query-sql")] EndpointType::Sql => || async { true }.boxed(), }; let mut failures = 0; @@ -169,7 +193,7 @@ async fn process_task( task: &BeamTask, obf_cache: Arc>, report_cache: Arc>, - db_pool: Option, + db_pool: Option, ) -> Result { debug!("Processing task {}", task.id); @@ -217,6 +241,7 @@ async fn process_task( ) .await }, + #[cfg(feature = "query-sql")] EndpointType::BlazeAndSql => { let mut generated_from_ast: bool = false; let data = base64_decode(&task.body)?; @@ -260,6 +285,7 @@ async fn process_task( .await } }, + #[cfg(feature="query-sql")] EndpointType::Sql => { let data = base64_decode(&task.body)?; let query_maybe: Result = serde_json::from_slice(&(data)); From 8d335d5834a8b27d46ce95a75cf55faa2bd699ed Mon Sep 17 00:00:00 2001 From: Martin Lablans Date: Wed, 4 Sep 2024 11:43:40 +0200 Subject: [PATCH 23/36] Remove sql from CLI --- src/config.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/config.rs b/src/config.rs index c388cd7..8c05727 100644 --- a/src/config.rs +++ b/src/config.rs @@ -163,6 +163,7 @@ struct CliArgs { auth_header: Option, /// Database connection string + #[cfg(feature = "query-sql")] #[clap(long, env, value_parser)] postgres_connection_string: Option, } @@ -192,6 +193,7 @@ pub(crate) struct Config { pub provider: Option, pub provider_icon: Option, pub auth_header: Option, + #[cfg(feature = "query-sql")] pub postgres_connection_string: Option, } @@ -234,6 +236,7 @@ impl Config { provider: cli_args.provider, provider_icon: cli_args.provider_icon, auth_header: cli_args.auth_header, + #[cfg(feature = "query-sql")] postgres_connection_string: cli_args.postgres_connection_string, client, }; From 19f5283858de8dd56ca3e76f70de018158477965 Mon Sep 17 00:00:00 2001 From: Torben Brenner Date: Mon, 8 Jan 2024 11:07:12 +0000 Subject: [PATCH 24/36] feat: added dhki stratifiers --- resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER | 5 +++++ resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER | 5 +++++ resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER | 8 ++++++++ resources/cql/DKTK_STRAT_AGE_STRATIFIER | 9 ++++++++- 4 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER create mode 100644 resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER create mode 100644 resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER diff --git a/resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER b/resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER new file mode 100644 index 0000000..ffd36d2 --- /dev/null +++ b/resources/cql/DHKI_STRAT_ENCOUNTER_STRATIFIER @@ -0,0 +1,5 @@ +define Encounter: +if InInitialPopulation then [Encounter] else {} as List + +define function Departments(encounter FHIR.Encounter): +encounter.identifier.where(system = 'http://dktk.dkfz.de/fhir/sid/hki-department').value.first() diff --git a/resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER b/resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER new file mode 100644 index 0000000..835ee49 --- /dev/null +++ b/resources/cql/DHKI_STRAT_MEDICATION_STRATIFIER @@ -0,0 +1,5 @@ +define MedicationStatement: +if InInitialPopulation then [MedicationStatement] else {} as List + +define function AppliedMedications(medication FHIR.MedicationStatement): +medication.medication.coding.code.last() diff --git a/resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER b/resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER new file mode 100644 index 0000000..75534e4 --- /dev/null +++ b/resources/cql/DHKI_STRAT_SPECIMEN_STRATIFIER @@ -0,0 +1,8 @@ +define Specimen: +if InInitialPopulation then [Specimen] else {} as List + +define function SampleType(specimen FHIR.Specimen): +specimen.type.coding.where(system = 'https://fhir.bbmri.de/CodeSystem/SampleMaterialType').code.first() + +define function SampleSubtype(specimen FHIR.Specimen): +specimen.type.text.first() diff --git a/resources/cql/DKTK_STRAT_AGE_STRATIFIER b/resources/cql/DKTK_STRAT_AGE_STRATIFIER index 6cb9744..9efc998 100644 --- a/resources/cql/DKTK_STRAT_AGE_STRATIFIER +++ b/resources/cql/DKTK_STRAT_AGE_STRATIFIER @@ -4,5 +4,12 @@ from [Condition] C where C.extension.where(url='http://hl7.org/fhir/StructureDefinition/condition-related').empty() and C.onset is not null sort by date from onset asc) +define FirstDiagnosis: +First( +from [Condition] C +sort by date from onset asc) + define AgeClass: -if (PrimaryDiagnosis.onset is null) then 'unknown' else ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(PrimaryDiagnosis.onset)) div 10) * 10) +if (PrimaryDiagnosis.onset is null) +then ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(FirstDiagnosis.onset)) div 10) * 10) +else ToString((AgeInYearsAt(FHIRHelpers.ToDateTime(PrimaryDiagnosis.onset)) div 10) * 10) From b3eb5e119692d07eb9f8b3d1bac8e0d6b12d3055 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Thu, 12 Sep 2024 16:11:13 +0200 Subject: [PATCH 25/36] pgrowserde up, focus up --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index d0d51f5..f8c27a9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "focus" -version = "0.6.0" +version = "0.7.0" edition = "2021" license = "Apache-2.0" @@ -34,7 +34,7 @@ clap = { version = "4", default_features = false, features = ["std", "env", "der # Query via SQL sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"], optional = true } -kurtbuilds_sqlx_serde = { version = "0.3.1", optional = true } +kurtbuilds_sqlx_serde = { version = "0.3.2", optional = true } [features] From c39b6a21fdad863fd1edae78ceb8d7ec3fa24fd0 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Thu, 12 Sep 2024 16:21:18 +0200 Subject: [PATCH 26/36] default_features -> default-features --- Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f8c27a9..5089e2a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ serde_json = "1.0" thiserror = "1.0.38" chrono = "0.4.31" indexmap = "2.1.0" -tokio = { version = "1.25.0", default_features = false, features = ["signal", "rt-multi-thread", "macros"] } +tokio = { version = "1.25.0", default-features = false, features = ["signal", "rt-multi-thread", "macros"] } beam-lib = { git = "https://github.com/samply/beam", branch = "develop", features = ["http-util"] } laplace_rs = {git = "https://github.com/samply/laplace-rs.git", tag = "v0.3.0" } uuid = "1.8.0" @@ -23,14 +23,14 @@ futures-util = { version = "0.3", default-features = false, features = ["std"] } tryhard = "0.5" # Logging -tracing = { version = "0.1.37", default_features = false } -tracing-subscriber = { version = "0.3.11", default_features = false, features = ["env-filter", "ansi"] } +tracing = { version = "0.1.37", default-features = false } +tracing-subscriber = { version = "0.3.11", default-features = false, features = ["env-filter", "ansi"] } # Global variables once_cell = "1.18" # Command Line Interface -clap = { version = "4", default_features = false, features = ["std", "env", "derive", "help", "color"] } +clap = { version = "4", default-features = false, features = ["std", "env", "derive", "help", "color"] } # Query via SQL sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"], optional = true } From 5183653195300c6e98a861af7ce630871586c1d1 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Tue, 24 Sep 2024 15:10:42 +0200 Subject: [PATCH 27/36] up SQL version --- Cargo.toml | 4 ++-- resources/sql/SELECT_TABLES | 1 - resources/sql/SELECT_TEST | 1 + src/db.rs | 26 ++++++++++++++++++++++++++ 4 files changed, 29 insertions(+), 3 deletions(-) delete mode 100644 resources/sql/SELECT_TABLES create mode 100644 resources/sql/SELECT_TEST diff --git a/Cargo.toml b/Cargo.toml index 5089e2a..a87d070 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,8 +33,8 @@ once_cell = "1.18" clap = { version = "4", default-features = false, features = ["std", "env", "derive", "help", "color"] } # Query via SQL -sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono"], optional = true } -kurtbuilds_sqlx_serde = { version = "0.3.2", optional = true } +sqlx = { version = "0.8.2", features = [ "runtime-tokio", "postgres", "macros", "chrono", "rust_decimal", "uuid"], optional = true } +kurtbuilds_sqlx_serde = { version = "0.3.2", features = [ "json", "decimal", "chrono", "uuid"], optional = true } [features] diff --git a/resources/sql/SELECT_TABLES b/resources/sql/SELECT_TABLES deleted file mode 100644 index c59f3b3..0000000 --- a/resources/sql/SELECT_TABLES +++ /dev/null @@ -1 +0,0 @@ -SELECT * FROM pg_catalog.pg_tables \ No newline at end of file diff --git a/resources/sql/SELECT_TEST b/resources/sql/SELECT_TEST new file mode 100644 index 0000000..8f90872 --- /dev/null +++ b/resources/sql/SELECT_TEST @@ -0,0 +1 @@ +SELECT 10 AS VALUE, quote_literal('Hello Rustaceans') AS GREETING, 4.7 as FLOATY, CURRENT_DATE AS TODAY; \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 48230d8..b6a0ff9 100644 --- a/src/db.rs +++ b/src/db.rs @@ -62,3 +62,29 @@ pub fn serialize_rows(rows: Vec) -> Result { Ok(Value::Array(rows_json)) } + +#[cfg(test)] +mod test { + use super::*; + + #[tokio::test] + #[ignore] //TODO mock DB + async fn serialize() { + let pool = + get_pg_connection_pool("postgresql://postgres:secret@localhost:5432/postgres", 1) + .await + .unwrap(); + + let rows = run_query(&pool, SQL_REPLACE_MAP.get("SELECT_TEST").unwrap()) + .await + .unwrap(); + + dbg!(&rows); + let rows_json = serialize_rows(rows).unwrap(); + dbg!(&rows_json); + + assert!(rows_json.is_array()); + + assert_ne!(rows_json[0]["floaty"], Value::Null); + } +} \ No newline at end of file From 534c43e649b47d081fff0607675712e3cc6e79cf Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Tue, 24 Sep 2024 15:29:24 +0200 Subject: [PATCH 28/36] Update CHANGELOG.md --- CHANGELOG.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1cbee72..9069fe1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,10 @@ +# Samply.Focus v0.7.0 2024-09-24 + +## Major changes +* PostgreSQL support added + + + # Focus -- 2023-02-08 This is the initial release of Focus, a task distribution application designed for working with Samply.Beam. Currently, only Samply.Blaze is supported as an endpoint, but other endpoints can easily be integrated. From 7b2b2f9a442c14f4ebce473c4be6de38018476ae Mon Sep 17 00:00:00 2001 From: Enola Knezevic <115070135+enola-dkfz@users.noreply.github.com> Date: Mon, 7 Oct 2024 14:17:25 +0200 Subject: [PATCH 29/36] renamed SQL error to include SQL (#170) --- src/db.rs | 2 +- src/errors.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/db.rs b/src/db.rs index b6a0ff9..320eeb5 100644 --- a/src/db.rs +++ b/src/db.rs @@ -37,7 +37,7 @@ pub async fn run_query(pool: &PgPool, query: &str) -> Result, FocusEr sqlx::query(query) .fetch_all(pool) .await - .map_err(FocusError::ErrorExecutingQuery) + .map_err(FocusError::ErrorExecutingSqlQuery) } pub async fn process_sql_task(pool: &PgPool, key: &str) -> Result, FocusError> { diff --git a/src/errors.rs b/src/errors.rs index 28e4820..1470b09 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -68,8 +68,8 @@ pub enum FocusError { #[error("Query not allowed: {0}")] QueryNotAllowed(String), #[cfg(feature = "query-sql")] - #[error("Error executing query: {0}")] - ErrorExecutingQuery(sqlx::Error), + #[error("Error executing SQL query: {0}")] + ErrorExecutingSqlQuery(sqlx::Error), } impl FocusError { From b316228772e3766ada7320d9b3ec56f7a0758d2a Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 7 Oct 2024 14:21:51 +0200 Subject: [PATCH 30/36] feature DKTK for postgres explained in readme --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 1718f1a..4d10240 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,10 @@ QUERIES_TO_CACHE = "queries_to_cache.conf" # The path to a file containing base6 PROVIDER = "name" #EUCAIM provider name PROVIDER_ICON = "iVBORw0KGgoAAAANSUhEUgAAAAEAAAABAQMAAAAl21bKAAAAA1BMVEUAAACnej3aAAAAAXRSTlMAQObYZgAAAApJREFUCNdjYAAAAAIAAeIhvDMAAAAASUVORK5CYII=" # Base64 encoded EUCAIM provider icon AUTH_HEADER = "ApiKey XXXX" #Authorization header +``` + +In order to use Postgres querying, a Docker image built with the feature "dktk" needs to be used and this optional variable set: +```bash POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Postgres connection string ``` From 0b5df26bbdb0648d8477b917192c61bdb4f9b3cb Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Thu, 10 Oct 2024 13:00:04 +0200 Subject: [PATCH 31/36] extracted max db connect attempts to CL param --- src/config.rs | 11 ++++++++++- src/main.rs | 2 +- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 8c05727..35d375f 100644 --- a/src/config.rs +++ b/src/config.rs @@ -162,10 +162,15 @@ struct CliArgs { #[clap(long, env, value_parser)] auth_header: Option, - /// Database connection string + /// Postgres connection string #[cfg(feature = "query-sql")] #[clap(long, env, value_parser)] postgres_connection_string: Option, + + /// Max number of attempts to connect to the database + #[cfg(feature = "query-sql")] + #[clap(long, env, value_parser, default_value = "8")] + max_attempts: u32, } pub(crate) struct Config { @@ -195,6 +200,8 @@ pub(crate) struct Config { pub auth_header: Option, #[cfg(feature = "query-sql")] pub postgres_connection_string: Option, + #[cfg(feature = "query-sql")] + pub max_attempts: u32, } impl Config { @@ -238,6 +245,8 @@ impl Config { auth_header: cli_args.auth_header, #[cfg(feature = "query-sql")] postgres_connection_string: cli_args.postgres_connection_string, + #[cfg(feature = "query-sql")] + max_attempts: cli_args.max_attempts, client, }; Ok(config) diff --git a/src/main.rs b/src/main.rs index 77ef6da..952c4a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -133,7 +133,7 @@ async fn get_db_pool() -> Result,ExitCode> { #[cfg(feature = "query-sql")] async fn get_db_pool() -> Result,ExitCode> { if let Some(connection_string) = CONFIG.postgres_connection_string.clone() { - match db::get_pg_connection_pool(&connection_string, 8).await { + match db::get_pg_connection_pool(&connection_string, CONFIG.max_attempts).await { Err(e) => { error!("Error connecting to database: {}", e); Err(ExitCode::from(8)) From 3a6505b3c87d68c8fbf92d0c500feb873610d38c Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Thu, 10 Oct 2024 13:16:00 +0200 Subject: [PATCH 32/36] CLA max attempts db connect in readme --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 4d10240..60ea26b 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,11 @@ In order to use Postgres querying, a Docker image built with the feature "dktk" POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/postgres" # Postgres connection string ``` +Additionally when using Postgres this optional variable can be set: +```bash +MAX_ATTEMPTS = "8" # Max number of attempts to connect to the database, default value: 8 +``` + Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`. Optionally, you can provide the `TLS_CA_CERTIFICATES_DIR` environment variable to add additional trusted certificates, e.g., if you have a TLS-terminating proxy server in place. The application respects the `HTTP_PROXY`, `HTTPS_PROXY`, `ALL_PROXY`, `NO_PROXY`, and their respective lowercase equivalents. @@ -85,7 +90,7 @@ Creating a sample task containing an abstract syntax tree (AST) query using curl curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"bbmri"},"body":"eyJsYW5nIjoiYXN0IiwicGF5bG9hZCI6ImV5SmhjM1FpT25zaWIzQmxjbUZ1WkNJNklrOVNJaXdpWTJocGJHUnlaVzRpT2x0N0ltOXdaWEpoYm1RaU9pSkJUa1FpTENKamFHbHNaSEpsYmlJNlczc2liM0JsY21GdVpDSTZJazlTSWl3aVkyaHBiR1J5Wlc0aU9sdDdJbXRsZVNJNkltZGxibVJsY2lJc0luUjVjR1VpT2lKRlVWVkJURk1pTENKemVYTjBaVzBpT2lJaUxDSjJZV3gxWlNJNkltMWhiR1VpZlN4N0ltdGxlU0k2SW1kbGJtUmxjaUlzSW5SNWNHVWlPaUpGVVZWQlRGTWlMQ0p6ZVhOMFpXMGlPaUlpTENKMllXeDFaU0k2SW1abGJXRnNaU0o5WFgxZGZWMTlMQ0pwWkNJNkltRTJaakZqWTJZekxXVmlaakV0TkRJMFppMDVaRFk1TFRSbE5XUXhNelZtTWpNME1DSjkifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` -Creating a sample SQL task for a `SELECT_TABLES` query using curl: +Creating a sample SQL task for a `SELECT_TEST` query using curl: ```bash curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RBQkxFUyJ9"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` From 20c6c1827e4eb5d023834d43a795a33469a41eed Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Thu, 10 Oct 2024 13:17:22 +0200 Subject: [PATCH 33/36] changelog as requested --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9069fe1..8f41ff8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,7 @@ # Samply.Focus v0.7.0 2024-09-24 +In this release, we are extending the supported data backends beyond CQL-enabled FHIR stores. We now support PostgreSQL as well. Usage instructions are included in the Readme. + ## Major changes * PostgreSQL support added From 9ecd9a5af76ec14599109dce1cd752277dbef5e6 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Thu, 10 Oct 2024 14:03:16 +0200 Subject: [PATCH 34/36] max attempts renamed to max_db_attempts --- README.md | 2 +- src/config.rs | 6 +++--- src/main.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 60ea26b..b8960fd 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/post Additionally when using Postgres this optional variable can be set: ```bash -MAX_ATTEMPTS = "8" # Max number of attempts to connect to the database, default value: 8 +MAX_DB_ATTEMPTS = "8" # Max number of attempts to connect to the database, default value: 8 ``` Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`. diff --git a/src/config.rs b/src/config.rs index 35d375f..d45a408 100644 --- a/src/config.rs +++ b/src/config.rs @@ -170,7 +170,7 @@ struct CliArgs { /// Max number of attempts to connect to the database #[cfg(feature = "query-sql")] #[clap(long, env, value_parser, default_value = "8")] - max_attempts: u32, + max_db_attempts: u32, } pub(crate) struct Config { @@ -201,7 +201,7 @@ pub(crate) struct Config { #[cfg(feature = "query-sql")] pub postgres_connection_string: Option, #[cfg(feature = "query-sql")] - pub max_attempts: u32, + pub max_db_attempts: u32, } impl Config { @@ -246,7 +246,7 @@ impl Config { #[cfg(feature = "query-sql")] postgres_connection_string: cli_args.postgres_connection_string, #[cfg(feature = "query-sql")] - max_attempts: cli_args.max_attempts, + max_db_attempts: cli_args.max_db_attempts, client, }; Ok(config) diff --git a/src/main.rs b/src/main.rs index 952c4a5..eac421c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -133,7 +133,7 @@ async fn get_db_pool() -> Result,ExitCode> { #[cfg(feature = "query-sql")] async fn get_db_pool() -> Result,ExitCode> { if let Some(connection_string) = CONFIG.postgres_connection_string.clone() { - match db::get_pg_connection_pool(&connection_string, CONFIG.max_attempts).await { + match db::get_pg_connection_pool(&connection_string, CONFIG.max_db_attempts).await { Err(e) => { error!("Error connecting to database: {}", e); Err(ExitCode::from(8)) From 60f42e3f5f7bab34475439f48561e8d04ab36868 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 14 Oct 2024 15:08:32 +0200 Subject: [PATCH 35/36] changed query in SQL task in readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b8960fd..7e5c2b1 100644 --- a/README.md +++ b/README.md @@ -61,7 +61,7 @@ POSTGRES_CONNECTION_STRING = "postgresql://postgres:Test.123@localhost:5432/post Additionally when using Postgres this optional variable can be set: ```bash -MAX_DB_ATTEMPTS = "8" # Max number of attempts to connect to the database, default value: 8 +MAX_DB_ATTEMPTS = "8" # Max number of attempts to connect to the database; default value: 8 ``` Obfuscating zero counts is by default switched off. To enable obfuscating zero counts, set the env. variable `OBFUSCATE_ZERO`. @@ -92,7 +92,7 @@ curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef Creating a sample SQL task for a `SELECT_TEST` query using curl: ```bash - curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RBQkxFUyJ9"}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks + curl -v -X POST -H "Content-Type: application/json" --data '{"id":"7fffefff-ffef-fcff-feef-feffffffffff","from":"app1.proxy1.broker","to":["app1.proxy1.broker"],"ttl":"10s","failure_strategy":{"retry":{"backoff_millisecs":1000,"max_tries":5}},"metadata":{"project":"exliquid"},"body":"eyJwYXlsb2FkIjoiU0VMRUNUX1RFU1QifQ=="}' -H "Authorization: ApiKey app1.proxy1.broker App1Secret" http://localhost:8081/v1/tasks ``` Creating a sample [Exporter](https://github.com/samply/exporter) "execute" task containing an Exporter query using curl: From c976254b2d867df6aae2703e88aec4724eecb979 Mon Sep 17 00:00:00 2001 From: Enola Knezevic Date: Mon, 14 Oct 2024 15:12:55 +0200 Subject: [PATCH 36/36] renamed max_attempts to max_db_attempts --- src/db.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/db.rs b/src/db.rs index 320eeb5..610a649 100644 --- a/src/db.rs +++ b/src/db.rs @@ -14,7 +14,7 @@ pub struct SqlQuery { include!(concat!(env!("OUT_DIR"), "/sql_replace_map.rs")); -pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result { +pub async fn get_pg_connection_pool(pg_url: &str, max_db_attempts: u32) -> Result { info!("Trying to establish a PostgreSQL connection pool"); tryhard::retry_fn(|| async { @@ -28,7 +28,7 @@ pub async fn get_pg_connection_pool(pg_url: &str, max_attempts: u32) -> Result