diff --git a/api/src/lib.rs b/api/src/lib.rs index 3f4b5733..a3059c0e 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -118,12 +118,8 @@ async fn latest_handler( Ok(Json(LatestResp { data })) } -pub async fn run(connect_string: &str) { - // set up postgres connection pool - // TODO: move these two lines back to main? - let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap(); - let pool = bb8::Pool::builder().build(manager).await.unwrap(); +pub async fn run(pool: PgConnectionPool) { // build our application with routes let app = Router::new() .route( diff --git a/api/src/main.rs b/api/src/main.rs index 28b4b6a8..fd762231 100644 --- a/api/src/main.rs +++ b/api/src/main.rs @@ -1,3 +1,6 @@ +use bb8_postgres::PostgresConnectionManager; +use tokio_postgres::NoTls; + #[tokio::main] async fn main() { let args: Vec = std::env::args().collect(); @@ -12,5 +15,9 @@ async fn main() { connect_string.push_str(&args[4]) } - lard_api::run(&connect_string).await; + // set up postgres connection pool + let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap(); + let pool = bb8::Pool::builder().build(manager).await.unwrap(); + + lard_api::run(pool).await; } diff --git a/integration_tests/Makefile b/integration_tests/Makefile index 720b185a..100555e7 100644 --- a/integration_tests/Makefile +++ b/integration_tests/Makefile @@ -6,6 +6,10 @@ end_to_end: _end_to_end clean _end_to_end: setup cargo test --test end_to_end --no-fail-fast -- --nocapture --test-threads=1 +kafka: _kafka clean +_kafka: setup + cargo test --test end_to_end test_kafka --features debug --no-fail-fast -- --nocapture --test-threads=1 + # With the `debug` feature, the database is not cleaned up after running the test, # so it can be inspected with psql. Run with: # TEST= make debug_test diff --git a/integration_tests/tests/end_to_end.rs b/integration_tests/tests/end_to_end.rs index e3f20c7a..6ba02214 100644 --- a/integration_tests/tests/end_to_end.rs +++ b/integration_tests/tests/end_to_end.rs @@ -13,7 +13,7 @@ use tokio_postgres::NoTls; use lard_api::timeseries::Timeseries; use lard_api::{LatestResp, TimeseriesResp, TimesliceResp}; -use lard_ingestion::kvkafka::{self, insert_kvdata, Msg}; +use lard_ingestion::kvkafka; use lard_ingestion::permissions::{ timeseries_is_open, ParamPermit, ParamPermitTable, StationPermitTable, }; @@ -140,7 +140,7 @@ async fn e2e_test_wrapper>(test: T) { let manager = PostgresConnectionManager::new_from_stringlike(CONNECT_STRING, NoTls).unwrap(); let db_pool = bb8::Pool::builder().build(manager).await.unwrap(); - let api_server = tokio::spawn(lard_api::run(CONNECT_STRING)); + let api_server = tokio::spawn(lard_api::run(db_pool.clone())); let ingestor = tokio::spawn(lard_ingestion::run( db_pool.clone(), PARAMCONV_CSV, @@ -382,42 +382,35 @@ async fn test_timeslice_endpoint() { #[tokio::test] async fn test_kafka() { e2e_test_wrapper(async { - let (tx, mut rx) = mpsc::channel::(10); + let (tx, mut rx) = mpsc::channel(10); - // Spawn task to receive messages - tokio::spawn(async move { - let (client, conn) = tokio_postgres::connect(CONNECT_STRING, NoTls) - .await - .unwrap(); - - tokio::spawn(async move { - if let Err(e) = conn.await { - eprintln!("{}", e) - } - }); + let (pgclient, conn) = tokio_postgres::connect(CONNECT_STRING, NoTls) + .await + .unwrap(); - while let Some(msg) = rx.recv().await { - if let Err(e) = insert_kvdata(&client, msg).await { - eprintln!("Database insert error: {e}"); - } + tokio::spawn(async move { + if let Err(e) = conn.await { + eprintln!("{}", e) } }); - let ts = TestData { - station_id: 20001, - params: &[Param::new(106, "RR_1")], // sum(precipitation_amount PT1H) - start_time: Utc.with_ymd_and_hms(2024, 6, 5, 12, 0, 0).unwrap(), - period: chrono::Duration::hours(1), - type_id: -4, - len: 24, - }; + // Spawn task to send message + tokio::spawn(async move { + let ts = TestData { + station_id: 20001, + params: &[Param::new(106, "RR_1")], // sum(precipitation_amount PT1H) + start_time: Utc.with_ymd_and_hms(2024, 6, 5, 12, 0, 0).unwrap(), + period: chrono::Duration::hours(1), + type_id: -4, + len: 24, + }; - let client = reqwest::Client::new(); - let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; - assert_eq!(ingestor_resp.res, 0); + let client = reqwest::Client::new(); + let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await; + assert_eq!(ingestor_resp.res, 0); - // This observation was 2.5 hours late?? - let kafka_xml = r#" + // This observation was 2.5 hours late?? + let kafka_xml = r#" @@ -440,14 +433,21 @@ async fn test_kafka() { "#; - kvkafka::parse_message(kafka_xml.as_bytes(), &tx) - .await - .unwrap(); + kvkafka::parse_message(kafka_xml.as_bytes(), &tx) + .await + .unwrap(); + }); - // NOTE: sleep a bit so the message can be inserted into the database - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + // wait for message + if let Some(msg) = rx.recv().await { + kvkafka::insert_kvdata(&pgclient, msg).await.unwrap() + } - // TODO: we do not have an API endpoint to query the flags table + // TODO: we do not have an API endpoint to query the flags.kvdata table + assert!(pgclient + .query_one("SELECT * FROM flags.kvdata", &[]) + .await + .is_ok()); }) .await }