Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aggregator_core: Allow dumping query plans to stdout #3347

Merged
merged 6 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions aggregator_core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# `aggregator_core`

`aggregator_core` contains helper code for the `aggregator` crate. It mainly consists of data
structures that are only pertinent to a DAP aggregator, and subroutines for talking to a PostgreSQL
database.

It is not published to crates.io and should not be depended on directly by users of Janus.

## PostgreSQL Logs

During tests, you can have PostgreSQL dump its logs to stdout by setting
`JANUS_TEST_DUMP_POSTGRESQL_LOGS`. The test database is set to log all query plans. You should only
run this option with one test at a time, otherwise you might not get all the logs.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this is a testcontainers bug--if running multiple tests at at time, the output is incomplete, and perhaps interspersed with other containers. IMO it's not a big deal, since we'd only care to run this against one test at a time, so I don't care to spend too much time on it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, we run multiple tests against the same container, to save on startup time, and separate by using different databases within the same process. Thus, database logs from queries in unrelated test aggregators will be commingled. It's also possible that we may shut down one database, and start another, if all the relevant Arcs go out of scope simultaneously. I wonder if there's some sort of race in the testcontainers library where deleting a container interrupts log forwarding, and if killing the container first, then deleting it once the log stream is done, would fix this.

Copy link
Contributor Author

@inahga inahga Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about sharing a container (I had thought it was 1:1, since I didn't read the code closely, and because I think the phenomena you describe about Arcs going out of scope applies, showing as multiple postgres containers in docker ps).

It looks like the log consumer task is spawned via tokio::spawn, and the corresponding handle is never joined, so testcontainers never waits for the log stream to be drained before the process is exited or the container is dropped. So I think a bunch of log lines get queued up, but never get serviced because the process/container ends too quickly.

Indeed, if I tack a sleep() at the end of an affected test, I see a much more plausible volume of output.


Example:
```
JANUS_TEST_DUMP_POSTGRESQL_LOGS= RUST_LOG=debug cargo test datastore::tests::roundtrip_task::case_1 -- --exact --nocapture
```
62 changes: 53 additions & 9 deletions aggregator_core/src/datastore/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::{
use backoff::{future::retry, ExponentialBackoffBuilder};
use chrono::NaiveDateTime;
use deadpool_postgres::{Manager, Pool, Timeouts};
use futures::{prelude::future::BoxFuture, FutureExt};
use janus_core::{
test_util::testcontainers::Postgres,
time::{Clock, MockClock, TimeExt},
Expand All @@ -17,12 +18,17 @@ use sqlx::{
Connection, PgConnection,
};
use std::{
env,
path::PathBuf,
str::FromStr,
sync::{Arc, Weak},
time::Duration,
};
use testcontainers::{runners::AsyncRunner, ContainerAsync, ContainerRequest, ImageExt};
use testcontainers::{
core::logs::{consumer::LogConsumer, LogFrame},
runners::AsyncRunner,
ContainerAsync, ContainerRequest, ImageExt,
};
use tokio::sync::Mutex;
use tokio_postgres::{connect, Config, NoTls};
use tracing::trace;
Expand Down Expand Up @@ -50,14 +56,31 @@ impl EphemeralDatabase {

async fn start() -> Self {
// Start an instance of Postgres running in a container.
let db_container = ContainerRequest::from(Postgres::default())
.with_cmd(Vec::from([
"-c".to_string(),
"max_connections=200".to_string(),
]))
.start()
.await
.unwrap();
let container_request =
ContainerRequest::from(Postgres::default()).with_cmd(Self::postgres_configuration(&[
// Many tests running concurrently can overwhelm the available connections, so bump
// the limit.
"max_connections=200",
// Enable logging of query plans.
"shared_preload_libraries=auto_explain",
"log_min_messages=LOG",
"auto_explain.log_min_duration=0",
"auto_explain.log_analyze=true",
// Discourage postgres from doing sequential scans, so we can analyze whether we
// have appropriate indexes in unit tests. Because test databases are not seeded
// with data, the query planner will often choose a sequential scan because it
// would be faster than hitting an index.
"enable_seqscan=false",
]));
let container_request = if env::var_os("JANUS_TEST_DUMP_POSTGRESQL_LOGS").is_some() {
// We don't use the default testcontainers LoggingConsumer, because it'll split query
// plans across multiple lines, interspersed with other logs, making them
// incomprehensible.
container_request.with_log_consumer(StdoutLogConsumer)
} else {
container_request
};
let db_container = container_request.start().await.unwrap();
const POSTGRES_DEFAULT_PORT: u16 = 5432;
let port_number = db_container
.get_host_port_ipv4(POSTGRES_DEFAULT_PORT)
Expand All @@ -77,6 +100,27 @@ impl EphemeralDatabase {
self.port_number
)
}

fn postgres_configuration<'a>(settings: &[&'a str]) -> Vec<&'a str> {
settings
.iter()
.map(|setting| ["-c", setting])
.flatten()
.collect::<Vec<_>>()
}
}

struct StdoutLogConsumer;

impl LogConsumer for StdoutLogConsumer {
/// Writes log bytes to stdout
fn accept<'a>(&'a self, record: &'a LogFrame) -> BoxFuture<'a, ()> {
async move {
let message = String::from_utf8_lossy(record.bytes());
println!("{}", message.trim_end_matches(|c| c == '\n' || c == '\r'));
}
.boxed()
}
}

/// EphemeralDatastore represents an ephemeral datastore instance. It has methods allowing
Expand Down
Loading