From 9184e43df4d2795242137aafa2fa7cbcbb5af92c Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Wed, 31 Jul 2024 13:50:07 -0500 Subject: [PATCH 1/6] aggregator_core: Allow dumping query plans to stdout --- aggregator_core/README.md | 18 +++++++ aggregator_core/src/datastore/test_util.rs | 62 ++++++++++++++++++---- 2 files changed, 71 insertions(+), 9 deletions(-) create mode 100644 aggregator_core/README.md diff --git a/aggregator_core/README.md b/aggregator_core/README.md new file mode 100644 index 000000000..20aa41ed9 --- /dev/null +++ b/aggregator_core/README.md @@ -0,0 +1,18 @@ +# `aggregator_core` + +`aggregator_core` contains helper code for the `aggregator` crate. It mainly consists of data +structures that are only pertinent to a DAP aggregator, and subroutines for talking to a PostgreSQL +database. + +It is not published to crates.io and should not be depended on directly by users of Janus. + +## PostgreSQL Logs + +During tests, you can have PostgreSQL dump its logs to stdout by setting +`JANUS_TEST_DUMP_POSTGRESQL_LOGS`. The test database is set to log all query plans. You should only +run this option with one test at a time, otherwise you might not get all the logs. + +Example: +``` +JANUS_TEST_DUMP_POSTGRESQL_LOGS= RUST_LOG=debug cargo test datastore::tests::roundtrip_task::case_1 -- --exact --nocapture +``` diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index 38ece748a..0b72fdc4a 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -5,6 +5,7 @@ use crate::{ use backoff::{future::retry, ExponentialBackoffBuilder}; use chrono::NaiveDateTime; use deadpool_postgres::{Manager, Pool, Timeouts}; +use futures::{prelude::future::BoxFuture, FutureExt}; use janus_core::{ test_util::testcontainers::Postgres, time::{Clock, MockClock, TimeExt}, @@ -17,12 +18,17 @@ use sqlx::{ Connection, PgConnection, }; use std::{ + env, path::PathBuf, str::FromStr, sync::{Arc, Weak}, time::Duration, }; -use testcontainers::{runners::AsyncRunner, ContainerAsync, ContainerRequest, ImageExt}; +use testcontainers::{ + core::logs::{consumer::LogConsumer, LogFrame}, + runners::AsyncRunner, + ContainerAsync, ContainerRequest, ImageExt, +}; use tokio::sync::Mutex; use tokio_postgres::{connect, Config, NoTls}; use tracing::trace; @@ -50,14 +56,31 @@ impl EphemeralDatabase { async fn start() -> Self { // Start an instance of Postgres running in a container. - let db_container = ContainerRequest::from(Postgres::default()) - .with_cmd(Vec::from([ - "-c".to_string(), - "max_connections=200".to_string(), - ])) - .start() - .await - .unwrap(); + let container_request = + ContainerRequest::from(Postgres::default()).with_cmd(Self::postgres_configuration(&[ + // Many tests running concurrently can overwhelm the available connections, so bump + // the limit. + "max_connections=200", + // Enable logging of query plans. + "shared_preload_libraries=auto_explain", + "log_min_messages=LOG", + "auto_explain.log_min_duration=0", + "auto_explain.log_analyze=true", + // Discourage postgres from doing sequential scans, so we can analyze whether we + // have appropriate indexes in unit tests. Because test databases are not seeded + // with data, the query planner will often choose a sequential scan because it + // would be faster than hitting an index. + "enable_seqscan=false", + ])); + let container_request = if env::var_os("JANUS_TEST_DUMP_POSTGRESQL_LOGS").is_some() { + // We don't use the default testcontainers LoggingConsumer, because it'll split query + // plans across multiple lines, interspersed with other logs, making them + // incomprehensible. + container_request.with_log_consumer(StdoutLogConsumer) + } else { + container_request + }; + let db_container = container_request.start().await.unwrap(); const POSTGRES_DEFAULT_PORT: u16 = 5432; let port_number = db_container .get_host_port_ipv4(POSTGRES_DEFAULT_PORT) @@ -77,6 +100,27 @@ impl EphemeralDatabase { self.port_number ) } + + fn postgres_configuration<'a>(settings: &[&'a str]) -> Vec<&'a str> { + settings + .iter() + .map(|setting| ["-c", setting]) + .flatten() + .collect::>() + } +} + +struct StdoutLogConsumer; + +impl LogConsumer for StdoutLogConsumer { + /// Writes log bytes to stdout + fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()> { + async move { + let message = String::from_utf8_lossy(record.bytes()); + println!("{}", message.trim_end_matches(|c| c == '\n' || c == '\r')); + } + .boxed() + } } /// EphemeralDatastore represents an ephemeral datastore instance. It has methods allowing From 442c5bc0af4f0ef131e0d0ff2f9d2c6019d7e890 Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Wed, 31 Jul 2024 13:59:09 -0500 Subject: [PATCH 2/6] Clippy --- aggregator_core/src/datastore/test_util.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index 0b72fdc4a..db806644f 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -104,8 +104,7 @@ impl EphemeralDatabase { fn postgres_configuration<'a>(settings: &[&'a str]) -> Vec<&'a str> { settings .iter() - .map(|setting| ["-c", setting]) - .flatten() + .flat_map(|setting| ["-c", setting]) .collect::>() } } From faeecb9f5b7aacbf773a501ccc0b827da81b2128 Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Wed, 31 Jul 2024 20:01:09 -0400 Subject: [PATCH 3/6] Synchronization ""fix"" --- aggregator_core/src/datastore/test_util.rs | 138 +++++++++++++-------- 1 file changed, 88 insertions(+), 50 deletions(-) diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index db806644f..76cfb65da 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -5,7 +5,7 @@ use crate::{ use backoff::{future::retry, ExponentialBackoffBuilder}; use chrono::NaiveDateTime; use deadpool_postgres::{Manager, Pool, Timeouts}; -use futures::{prelude::future::BoxFuture, FutureExt}; +use futures::{future::try_join_all, FutureExt, TryFutureExt}; use janus_core::{ test_util::testcontainers::Postgres, time::{Clock, MockClock, TimeExt}, @@ -22,21 +22,26 @@ use std::{ path::PathBuf, str::FromStr, sync::{Arc, Weak}, + thread::JoinHandle, time::Duration, }; -use testcontainers::{ - core::logs::{consumer::LogConsumer, LogFrame}, - runners::AsyncRunner, - ContainerAsync, ContainerRequest, ImageExt, +use testcontainers::{runners::AsyncRunner, ContainerRequest, ImageExt}; +use tokio::{ + io::{AsyncBufRead, AsyncBufReadExt}, + join, + sync::{ + oneshot::{self, Sender}, + Mutex, + }, }; -use tokio::sync::Mutex; use tokio_postgres::{connect, Config, NoTls}; use tracing::trace; use super::SUPPORTED_SCHEMA_VERSIONS; struct EphemeralDatabase { - _db_container: ContainerAsync, + db_thread: Option>, + db_thread_shutdown: Option>, port_number: u16, } @@ -55,41 +60,73 @@ impl EphemeralDatabase { } async fn start() -> Self { - // Start an instance of Postgres running in a container. - let container_request = - ContainerRequest::from(Postgres::default()).with_cmd(Self::postgres_configuration(&[ - // Many tests running concurrently can overwhelm the available connections, so bump - // the limit. - "max_connections=200", - // Enable logging of query plans. - "shared_preload_libraries=auto_explain", - "log_min_messages=LOG", - "auto_explain.log_min_duration=0", - "auto_explain.log_analyze=true", - // Discourage postgres from doing sequential scans, so we can analyze whether we - // have appropriate indexes in unit tests. Because test databases are not seeded - // with data, the query planner will often choose a sequential scan because it - // would be faster than hitting an index. - "enable_seqscan=false", - ])); - let container_request = if env::var_os("JANUS_TEST_DUMP_POSTGRESQL_LOGS").is_some() { - // We don't use the default testcontainers LoggingConsumer, because it'll split query - // plans across multiple lines, interspersed with other logs, making them - // incomprehensible. - container_request.with_log_consumer(StdoutLogConsumer) - } else { - container_request - }; - let db_container = container_request.start().await.unwrap(); - const POSTGRES_DEFAULT_PORT: u16 = 5432; - let port_number = db_container - .get_host_port_ipv4(POSTGRES_DEFAULT_PORT) - .await - .unwrap(); - trace!("Postgres container is up with port {port_number}"); - + let (shutdown_tx, shutdown_rx) = oneshot::channel(); + let (port_tx, port_rx) = oneshot::channel(); + + // Hack: run testcontainer logic under its own thread with its own tokio runtime, to avoid + // deadlocking the main runtime when waiting for logs to finish in the Drop implementation. + let db_thread = std::thread::spawn(move || { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap() + .block_on(async move { + // Start an instance of Postgres running in a container. + let db_container = ContainerRequest::from(Postgres::default()) + .with_cmd(Self::postgres_configuration(&[ + // Many tests running concurrently can overwhelm the available, + // connections, so bump the limit. + "max_connections=200", + // Enable logging of query plans. + "shared_preload_libraries=auto_explain", + "log_min_messages=LOG", + "auto_explain.log_min_duration=0", + "auto_explain.log_analyze=true", + // Discourage postgres from doing sequential scans, so we can analyze + // whether we have appropriate indexes in unit tests. Because test + // databases are not seeded with data, the query planner will often + // choose a sequential scan because it would be faster than hitting an + // index. + "enable_seqscan=false", + ])) + .start() + .await + .unwrap(); + + let stdout = db_container.stdout(true); + let stderr = db_container.stderr(true); + let log_consumer_handle = + env::var_os("JANUS_TEST_DUMP_POSTGRESQL_LOGS").map(|_| { + tokio::spawn(async move { + join!(buffer_printer(stdout), buffer_printer(stderr)); + }) + }); + + const POSTGRES_DEFAULT_PORT: u16 = 5432; + let port_number = db_container + .get_host_port_ipv4(POSTGRES_DEFAULT_PORT) + .await + .unwrap(); + trace!("Postgres container is up with port {port_number}"); + + // Send port information, which frees this function to continue. + port_tx.send(port_number).unwrap(); + + // Wait for shutdown to be signalled. + shutdown_rx.await.unwrap(); + drop(db_container); + + // Wait for log consumers to shutdown. + if let Some(log_consumer_handle) = log_consumer_handle { + log_consumer_handle.await.unwrap(); + } + }) + }); + + let port_number = port_rx.await.unwrap(); Self { - _db_container: db_container, + db_thread: Some(db_thread), + db_thread_shutdown: Some(shutdown_tx), port_number, } } @@ -109,16 +146,17 @@ impl EphemeralDatabase { } } -struct StdoutLogConsumer; +async fn buffer_printer(buffer: std::pin::Pin>) { + let mut lines = buffer.lines(); + while let Ok(Some(line)) = lines.next_line().await { + println!("{}", line); + } +} -impl LogConsumer for StdoutLogConsumer { - /// Writes log bytes to stdout - fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()> { - async move { - let message = String::from_utf8_lossy(record.bytes()); - println!("{}", message.trim_end_matches(|c| c == '\n' || c == '\r')); - } - .boxed() +impl Drop for EphemeralDatabase { + fn drop(&mut self) { + self.db_thread_shutdown.take().map(|tx| tx.send(())); + self.db_thread.take().map(|thread| thread.join()); } } From 6d1b7eb5d9df1938dbc8ae5fe660f242a2d53d07 Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Wed, 31 Jul 2024 20:07:41 -0400 Subject: [PATCH 4/6] Dead imports --- aggregator_core/src/datastore/test_util.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index 76cfb65da..a08bb6aff 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -5,7 +5,6 @@ use crate::{ use backoff::{future::retry, ExponentialBackoffBuilder}; use chrono::NaiveDateTime; use deadpool_postgres::{Manager, Pool, Timeouts}; -use futures::{future::try_join_all, FutureExt, TryFutureExt}; use janus_core::{ test_util::testcontainers::Postgres, time::{Clock, MockClock, TimeExt}, From 9775f057dc448f1bfdeab78795a73d6244e8271e Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Wed, 31 Jul 2024 20:09:40 -0400 Subject: [PATCH 5/6] You can run multiple tests now --- aggregator_core/README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aggregator_core/README.md b/aggregator_core/README.md index 20aa41ed9..d5b6fd0e1 100644 --- a/aggregator_core/README.md +++ b/aggregator_core/README.md @@ -9,8 +9,7 @@ It is not published to crates.io and should not be depended on directly by users ## PostgreSQL Logs During tests, you can have PostgreSQL dump its logs to stdout by setting -`JANUS_TEST_DUMP_POSTGRESQL_LOGS`. The test database is set to log all query plans. You should only -run this option with one test at a time, otherwise you might not get all the logs. +`JANUS_TEST_DUMP_POSTGRESQL_LOGS`. The test database is set to log all query plans. Example: ``` From 58d792701cc0e3abbc940c39de53df97aac6c927 Mon Sep 17 00:00:00 2001 From: Ameer Ghani Date: Thu, 1 Aug 2024 17:57:16 -0500 Subject: [PATCH 6/6] PR feedback --- aggregator_core/src/datastore/test_util.rs | 36 ++++++++++------------ 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/aggregator_core/src/datastore/test_util.rs b/aggregator_core/src/datastore/test_util.rs index a08bb6aff..703989565 100644 --- a/aggregator_core/src/datastore/test_util.rs +++ b/aggregator_core/src/datastore/test_util.rs @@ -70,36 +70,32 @@ impl EphemeralDatabase { .build() .unwrap() .block_on(async move { - // Start an instance of Postgres running in a container. - let db_container = ContainerRequest::from(Postgres::default()) - .with_cmd(Self::postgres_configuration(&[ - // Many tests running concurrently can overwhelm the available, - // connections, so bump the limit. - "max_connections=200", + let should_log = env::var_os("JANUS_TEST_DUMP_POSTGRESQL_LOGS").is_some(); + let mut configuration = Vec::from(["max_connections=200"]); + if should_log { + configuration.append(&mut Vec::from([ // Enable logging of query plans. "shared_preload_libraries=auto_explain", "log_min_messages=LOG", "auto_explain.log_min_duration=0", "auto_explain.log_analyze=true", - // Discourage postgres from doing sequential scans, so we can analyze - // whether we have appropriate indexes in unit tests. Because test - // databases are not seeded with data, the query planner will often - // choose a sequential scan because it would be faster than hitting an - // index. - "enable_seqscan=false", ])) + } + + // Start an instance of Postgres running in a container. + let db_container = ContainerRequest::from(Postgres::default()) + .with_cmd(Self::postgres_configuration(&configuration)) .start() .await .unwrap(); let stdout = db_container.stdout(true); let stderr = db_container.stderr(true); - let log_consumer_handle = - env::var_os("JANUS_TEST_DUMP_POSTGRESQL_LOGS").map(|_| { - tokio::spawn(async move { - join!(buffer_printer(stdout), buffer_printer(stderr)); - }) - }); + let log_consumer_handle = should_log.then(|| { + tokio::spawn(async move { + join!(buffer_printer(stdout), buffer_printer(stderr)); + }) + }); const POSTGRES_DEFAULT_PORT: u16 = 5432; let port_number = db_container @@ -113,9 +109,9 @@ impl EphemeralDatabase { // Wait for shutdown to be signalled. shutdown_rx.await.unwrap(); - drop(db_container); - // Wait for log consumers to shutdown. + // Shutdown container and Wait for log consumers to stop. + db_container.stop().await.unwrap(); if let Some(log_consumer_handle) = log_consumer_handle { log_consumer_handle.await.unwrap(); }