Skip to content

Commit

Permalink
Improve kafka test
Browse files Browse the repository at this point in the history
  • Loading branch information
Lun4m committed Jul 8, 2024
1 parent 8281fea commit 984d9a2
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 43 deletions.
6 changes: 1 addition & 5 deletions api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,8 @@ async fn latest_handler(

Ok(Json(LatestResp { data }))
}
pub async fn run(connect_string: &str) {
// set up postgres connection pool
// TODO: move these two lines back to main?
let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap();
let pool = bb8::Pool::builder().build(manager).await.unwrap();

pub async fn run(pool: PgConnectionPool) {
// build our application with routes
let app = Router::new()
.route(
Expand Down
9 changes: 8 additions & 1 deletion api/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use bb8_postgres::PostgresConnectionManager;
use tokio_postgres::NoTls;

#[tokio::main]
async fn main() {
let args: Vec<String> = std::env::args().collect();
Expand All @@ -12,5 +15,9 @@ async fn main() {
connect_string.push_str(&args[4])
}

lard_api::run(&connect_string).await;
// set up postgres connection pool
let manager = PostgresConnectionManager::new_from_stringlike(connect_string, NoTls).unwrap();
let pool = bb8::Pool::builder().build(manager).await.unwrap();

lard_api::run(pool).await;
}
4 changes: 4 additions & 0 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ end_to_end: _end_to_end clean
_end_to_end: setup
cargo test --test end_to_end --no-fail-fast -- --nocapture --test-threads=1

kafka: _kafka clean
_kafka: setup
cargo test --test end_to_end test_kafka --features debug --no-fail-fast -- --nocapture --test-threads=1

# With the `debug` feature, the database is not cleaned up after running the test,
# so it can be inspected with psql. Run with:
# TEST=<name> make debug_test
Expand Down
74 changes: 37 additions & 37 deletions integration_tests/tests/end_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tokio_postgres::NoTls;

use lard_api::timeseries::Timeseries;
use lard_api::{LatestResp, TimeseriesResp, TimesliceResp};
use lard_ingestion::kvkafka::{self, insert_kvdata, Msg};
use lard_ingestion::kvkafka;
use lard_ingestion::permissions::{
timeseries_is_open, ParamPermit, ParamPermitTable, StationPermitTable,
};
Expand Down Expand Up @@ -140,7 +140,7 @@ async fn e2e_test_wrapper<T: Future<Output = ()>>(test: T) {
let manager = PostgresConnectionManager::new_from_stringlike(CONNECT_STRING, NoTls).unwrap();
let db_pool = bb8::Pool::builder().build(manager).await.unwrap();

let api_server = tokio::spawn(lard_api::run(CONNECT_STRING));
let api_server = tokio::spawn(lard_api::run(db_pool.clone()));
let ingestor = tokio::spawn(lard_ingestion::run(
db_pool.clone(),
PARAMCONV_CSV,
Expand Down Expand Up @@ -382,42 +382,35 @@ async fn test_timeslice_endpoint() {
#[tokio::test]
async fn test_kafka() {
e2e_test_wrapper(async {
let (tx, mut rx) = mpsc::channel::<Msg>(10);
let (tx, mut rx) = mpsc::channel(10);

// Spawn task to receive messages
tokio::spawn(async move {
let (client, conn) = tokio_postgres::connect(CONNECT_STRING, NoTls)
.await
.unwrap();

tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("{}", e)
}
});
let (pgclient, conn) = tokio_postgres::connect(CONNECT_STRING, NoTls)
.await
.unwrap();

while let Some(msg) = rx.recv().await {
if let Err(e) = insert_kvdata(&client, msg).await {
eprintln!("Database insert error: {e}");
}
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("{}", e)
}
});

let ts = TestData {
station_id: 20001,
params: &[Param::new(106, "RR_1")], // sum(precipitation_amount PT1H)
start_time: Utc.with_ymd_and_hms(2024, 6, 5, 12, 0, 0).unwrap(),
period: chrono::Duration::hours(1),
type_id: -4,
len: 24,
};
// Spawn task to send message
tokio::spawn(async move {
let ts = TestData {
station_id: 20001,
params: &[Param::new(106, "RR_1")], // sum(precipitation_amount PT1H)
start_time: Utc.with_ymd_and_hms(2024, 6, 5, 12, 0, 0).unwrap(),
period: chrono::Duration::hours(1),
type_id: -4,
len: 24,
};

let client = reqwest::Client::new();
let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await;
assert_eq!(ingestor_resp.res, 0);
let client = reqwest::Client::new();
let ingestor_resp = ingest_data(&client, ts.obsinn_message()).await;
assert_eq!(ingestor_resp.res, 0);

// This observation was 2.5 hours late??
let kafka_xml = r#"<?xml?>
// This observation was 2.5 hours late??
let kafka_xml = r#"<?xml?>
<KvalobsData producer=\"kvqabase\" created=\"2024-06-06 08:30:43\">
<station val=\"20001\">
<typeid val=\"-4\">
Expand All @@ -440,14 +433,21 @@ async fn test_kafka() {
</station>
</KvalobsData>"#;

kvkafka::parse_message(kafka_xml.as_bytes(), &tx)
.await
.unwrap();
kvkafka::parse_message(kafka_xml.as_bytes(), &tx)
.await
.unwrap();
});

// NOTE: sleep a bit so the message can be inserted into the database
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
// wait for message
if let Some(msg) = rx.recv().await {
kvkafka::insert_kvdata(&pgclient, msg).await.unwrap()
}

// TODO: we do not have an API endpoint to query the flags table
// TODO: we do not have an API endpoint to query the flags.kvdata table
assert!(pgclient
.query_one("SELECT * FROM flags.kvdata", &[])
.await
.is_ok());
})
.await
}

0 comments on commit 984d9a2

Please sign in to comment.