Skip to content

Commit 858c020

Browse files
authored
Extract postgres container from sqllogictest, update datafusion-testing pin (#13971)
* Add support for sqlite test files to sqllogictest * Removed workaround for bug that was fixed. * Refactor sqllogictest to extract postgres functionality into a separate file. Removed dependency on once_cell in favour of LazyLock. * Add missing license header.
1 parent 8fc26c2 commit 858c020

File tree

5 files changed

+160
-144
lines changed

5 files changed

+160
-144
lines changed

datafusion-testing

Submodule datafusion-testing updated 49 files

datafusion/sqllogictest/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ indicatif = "0.17"
4949
itertools = { workspace = true }
5050
log = { workspace = true }
5151
object_store = { workspace = true }
52-
once_cell = { version = "1.20", optional = true }
5352
postgres-protocol = { version = "0.6.7", optional = true }
5453
postgres-types = { version = "0.2.8", features = ["derive", "with-chrono-0_4"], optional = true }
5554
rust_decimal = { version = "1.36.0", features = ["tokio-pg"] }
@@ -69,7 +68,6 @@ avro = ["datafusion/avro"]
6968
postgres = [
7069
"bytes",
7170
"chrono",
72-
"once_cell",
7371
"postgres-types",
7472
"postgres-protocol",
7573
"testcontainers",
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
#![cfg(feature = "postgres")]
19+
20+
use crate::Options;
21+
use datafusion_common::Result;
22+
use log::info;
23+
use std::env::set_var;
24+
use std::future::Future;
25+
use std::sync::LazyLock;
26+
use std::{env, thread};
27+
use testcontainers::core::IntoContainerPort;
28+
use testcontainers::runners::AsyncRunner;
29+
use testcontainers::ImageExt;
30+
use testcontainers_modules::postgres;
31+
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
32+
use tokio::sync::{mpsc, Mutex};
33+
use ContainerCommands::{FetchHost, FetchPort};
34+
35+
#[derive(Debug)]
36+
pub enum ContainerCommands {
37+
FetchHost,
38+
FetchPort,
39+
Stop,
40+
}
41+
42+
pub struct Channel<T> {
43+
pub tx: UnboundedSender<T>,
44+
pub rx: Mutex<UnboundedReceiver<T>>,
45+
}
46+
47+
pub fn channel<T>() -> Channel<T> {
48+
let (tx, rx) = mpsc::unbounded_channel();
49+
Channel {
50+
tx,
51+
rx: Mutex::new(rx),
52+
}
53+
}
54+
55+
pub fn execute_blocking<F: Future>(f: F) {
56+
tokio::runtime::Builder::new_current_thread()
57+
.enable_all()
58+
.build()
59+
.unwrap()
60+
.block_on(f);
61+
}
62+
63+
static POSTGRES_IN: LazyLock<Channel<ContainerCommands>> = LazyLock::new(channel);
64+
static POSTGRES_HOST: LazyLock<Channel<String>> = LazyLock::new(channel);
65+
static POSTGRES_PORT: LazyLock<Channel<u16>> = LazyLock::new(channel);
66+
static POSTGRES_STOPPED: LazyLock<Channel<()>> = LazyLock::new(channel);
67+
68+
pub async fn initialize_postgres_container(options: &Options) -> Result<()> {
69+
let start_pg_database = options.postgres_runner && !is_pg_uri_set();
70+
if start_pg_database {
71+
info!("Starting postgres db ...");
72+
73+
thread::spawn(|| {
74+
execute_blocking(start_postgres(
75+
&POSTGRES_IN,
76+
&POSTGRES_HOST,
77+
&POSTGRES_PORT,
78+
&POSTGRES_STOPPED,
79+
))
80+
});
81+
82+
POSTGRES_IN.tx.send(FetchHost).unwrap();
83+
let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
84+
85+
POSTGRES_IN.tx.send(FetchPort).unwrap();
86+
let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
87+
88+
let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
89+
info!("Postgres uri is {pg_uri}");
90+
91+
set_var("PG_URI", pg_uri);
92+
} else {
93+
// close receiver
94+
POSTGRES_IN.rx.lock().await.close();
95+
}
96+
97+
Ok(())
98+
}
99+
100+
pub async fn terminate_postgres_container() -> Result<()> {
101+
if !POSTGRES_IN.tx.is_closed() {
102+
println!("Stopping postgres db ...");
103+
POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
104+
POSTGRES_STOPPED.rx.lock().await.recv().await;
105+
}
106+
107+
Ok(())
108+
}
109+
110+
async fn start_postgres(
111+
in_channel: &Channel<ContainerCommands>,
112+
host_channel: &Channel<String>,
113+
port_channel: &Channel<u16>,
114+
stopped_channel: &Channel<()>,
115+
) {
116+
info!("Starting postgres test container with user postgres/postgres and db test");
117+
118+
let container = postgres::Postgres::default()
119+
.with_user("postgres")
120+
.with_password("postgres")
121+
.with_db_name("test")
122+
.with_mapped_port(16432, 5432.tcp())
123+
.with_tag("17-alpine")
124+
.start()
125+
.await
126+
.unwrap();
127+
// uncomment this if you are running docker in docker
128+
let host = "host.docker.internal".to_string();
129+
// let host = container.get_host().await.unwrap().to_string();
130+
let port = container.get_host_port_ipv4(5432).await.unwrap();
131+
132+
let mut rx = in_channel.rx.lock().await;
133+
while let Some(command) = rx.recv().await {
134+
match command {
135+
FetchHost => host_channel.tx.send(host.clone()).unwrap(),
136+
FetchPort => port_channel.tx.send(port).unwrap(),
137+
ContainerCommands::Stop => {
138+
container.stop().await.unwrap();
139+
stopped_channel.tx.send(()).unwrap();
140+
rx.close();
141+
}
142+
}
143+
}
144+
}
145+
146+
fn is_pg_uri_set() -> bool {
147+
match env::var("PG_URI") {
148+
Ok(_) => true,
149+
Err(_) => false,
150+
}
151+
}

datafusion/sqllogictest/bin/sqllogictests.rs

Lines changed: 8 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -28,33 +28,21 @@ use indicatif::{
2828
use itertools::Itertools;
2929
use log::Level::{Info, Warn};
3030
use log::{info, log_enabled, warn};
31-
#[cfg(feature = "postgres")]
32-
use once_cell::sync::Lazy;
3331
use sqllogictest::{
3432
parse_file, strict_column_validator, AsyncDB, Condition, Normalizer, Record,
3533
Validator,
3634
};
35+
3736
#[cfg(feature = "postgres")]
38-
use std::env::set_var;
37+
use crate::postgres_container::{
38+
initialize_postgres_container, terminate_postgres_container,
39+
};
3940
use std::ffi::OsStr;
4041
use std::fs;
41-
#[cfg(feature = "postgres")]
42-
use std::future::Future;
4342
use std::path::{Path, PathBuf};
43+
4444
#[cfg(feature = "postgres")]
45-
use std::{env, thread};
46-
#[cfg(feature = "postgres")]
47-
use testcontainers::core::IntoContainerPort;
48-
#[cfg(feature = "postgres")]
49-
use testcontainers::runners::AsyncRunner;
50-
#[cfg(feature = "postgres")]
51-
use testcontainers::ImageExt;
52-
#[cfg(feature = "postgres")]
53-
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
54-
#[cfg(feature = "postgres")]
55-
use tokio::sync::{mpsc, Mutex};
56-
#[cfg(feature = "postgres")]
57-
use ContainerCommands::{FetchHost, FetchPort};
45+
mod postgres_container;
5846

5947
const TEST_DIRECTORY: &str = "test_files/";
6048
const DATAFUSION_TESTING_TEST_DIRECTORY: &str = "../../datafusion-testing/data/";
@@ -170,31 +158,7 @@ async fn run_tests() -> Result<()> {
170158
options.warn_on_ignored();
171159

172160
#[cfg(feature = "postgres")]
173-
let start_pg_database = options.postgres_runner && !is_pg_uri_set();
174-
#[cfg(feature = "postgres")]
175-
if start_pg_database {
176-
info!("Starting postgres db ...");
177-
178-
thread::spawn(|| {
179-
execute_blocking(start_postgres(
180-
&POSTGRES_IN,
181-
&POSTGRES_HOST,
182-
&POSTGRES_PORT,
183-
&POSTGRES_STOPPED,
184-
))
185-
});
186-
187-
POSTGRES_IN.tx.send(FetchHost).unwrap();
188-
let db_host = POSTGRES_HOST.rx.lock().await.recv().await.unwrap();
189-
190-
POSTGRES_IN.tx.send(FetchPort).unwrap();
191-
let db_port = POSTGRES_PORT.rx.lock().await.recv().await.unwrap();
192-
193-
let pg_uri = format!("postgresql://postgres:postgres@{db_host}:{db_port}/test");
194-
info!("Postgres uri is {pg_uri}");
195-
196-
set_var("PG_URI", pg_uri);
197-
}
161+
initialize_postgres_container(&options).await?;
198162

199163
// Run all tests in parallel, reporting failures at the end
200164
//
@@ -277,11 +241,7 @@ async fn run_tests() -> Result<()> {
277241
m.println(format!("Completed in {}", HumanDuration(start.elapsed())))?;
278242

279243
#[cfg(feature = "postgres")]
280-
if start_pg_database {
281-
println!("Stopping postgres db ...");
282-
POSTGRES_IN.tx.send(ContainerCommands::Stop).unwrap_or(());
283-
POSTGRES_STOPPED.rx.lock().await.recv().await;
284-
}
244+
terminate_postgres_container().await?;
285245

286246
// report on any errors
287247
if !errors.is_empty() {
@@ -294,14 +254,6 @@ async fn run_tests() -> Result<()> {
294254
}
295255
}
296256

297-
#[cfg(feature = "postgres")]
298-
fn is_pg_uri_set() -> bool {
299-
match env::var("PG_URI") {
300-
Ok(_) => true,
301-
Err(_) => false,
302-
}
303-
}
304-
305257
async fn run_test_file(
306258
test_file: TestFile,
307259
validator: Validator,
@@ -758,87 +710,3 @@ impl Options {
758710
}
759711
}
760712
}
761-
762-
#[cfg(feature = "postgres")]
763-
pub async fn start_postgres(
764-
in_channel: &Channel<ContainerCommands>,
765-
host_channel: &Channel<String>,
766-
port_channel: &Channel<u16>,
767-
stopped_channel: &Channel<()>,
768-
) {
769-
info!("Starting postgres test container with user postgres/postgres and db test");
770-
771-
let container = testcontainers_modules::postgres::Postgres::default()
772-
.with_user("postgres")
773-
.with_password("postgres")
774-
.with_db_name("test")
775-
.with_mapped_port(16432, 5432.tcp())
776-
.with_tag("17-alpine")
777-
.start()
778-
.await
779-
.unwrap();
780-
// uncomment this if you are running docker in docker
781-
// let host = "host.docker.internal".to_string();
782-
let host = container.get_host().await.unwrap().to_string();
783-
let port = container.get_host_port_ipv4(5432).await.unwrap();
784-
785-
let mut rx = in_channel.rx.lock().await;
786-
while let Some(command) = rx.recv().await {
787-
match command {
788-
FetchHost => host_channel.tx.send(host.clone()).unwrap(),
789-
FetchPort => port_channel.tx.send(port).unwrap(),
790-
ContainerCommands::Stop => {
791-
container.stop().await.unwrap();
792-
stopped_channel.tx.send(()).unwrap();
793-
rx.close();
794-
}
795-
}
796-
}
797-
}
798-
799-
#[cfg(feature = "postgres")]
800-
#[derive(Debug)]
801-
pub enum ContainerCommands {
802-
FetchHost,
803-
FetchPort,
804-
Stop,
805-
}
806-
807-
#[cfg(feature = "postgres")]
808-
pub struct Channel<T> {
809-
pub tx: UnboundedSender<T>,
810-
pub rx: Mutex<UnboundedReceiver<T>>,
811-
}
812-
813-
#[cfg(feature = "postgres")]
814-
pub fn channel<T>() -> Channel<T> {
815-
let (tx, rx) = mpsc::unbounded_channel();
816-
Channel {
817-
tx,
818-
rx: Mutex::new(rx),
819-
}
820-
}
821-
822-
#[cfg(feature = "postgres")]
823-
pub fn execute_blocking<F: Future>(f: F) {
824-
tokio::runtime::Builder::new_current_thread()
825-
.enable_all()
826-
.build()
827-
.unwrap()
828-
.block_on(f);
829-
}
830-
831-
#[cfg(feature = "postgres")]
832-
pub struct HostPort {
833-
pub host: String,
834-
pub port: u16,
835-
}
836-
837-
#[cfg(feature = "postgres")]
838-
static POSTGRES_IN: Lazy<Channel<ContainerCommands>> = Lazy::new(channel);
839-
#[cfg(feature = "postgres")]
840-
static POSTGRES_HOST: Lazy<Channel<String>> = Lazy::new(channel);
841-
#[cfg(feature = "postgres")]
842-
static POSTGRES_PORT: Lazy<Channel<u16>> = Lazy::new(channel);
843-
#[cfg(feature = "postgres")]
844-
static POSTGRES_STOPPED: Lazy<Channel<()>> = Lazy::new(channel);

datafusion/sqllogictest/src/engines/postgres_engine/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ fn no_quotes(t: &str) -> &str {
215215
fn schema_name(relative_path: &Path) -> String {
216216
relative_path
217217
.to_string_lossy()
218-
.to_string()
219218
.chars()
220219
.filter(|ch| ch.is_ascii_alphanumeric())
221220
.collect::<String>()

0 commit comments

Comments
 (0)