Skip to content

Commit aa1c7c4

Browse files
authored
Add H2O.ai Database-like Ops benchmark to dfbench (join support) (#14902)
* Add H2O.ai Database-like Ops benchmark to dfbench (join support) * address new comments
1 parent 2fd558f commit aa1c7c4

File tree

3 files changed

+226
-7
lines changed

3 files changed

+226
-7
lines changed

benchmarks/README.md

+44
Original file line numberDiff line numberDiff line change
@@ -513,5 +513,49 @@ For example, to run query 1 with the small data generated above:
513513
cargo run --release --bin dfbench -- h2o --path ./benchmarks/data/h2o/G1_1e7_1e7_100_0.csv --query 1
514514
```
515515

516+
## h2o benchmarks for join
517+
518+
### Generate data for h2o benchmarks
519+
There are three options for generating data for h2o benchmarks: `small`, `medium`, and `big`. The data is generated in the `data` directory.
520+
521+
1. Generate small data (4 table files, the largest is 1e7 rows)
522+
```bash
523+
./bench.sh data h2o_small_join
524+
```
525+
526+
527+
2. Generate medium data (4 table files, the largest is 1e8 rows)
528+
```bash
529+
./bench.sh data h2o_medium_join
530+
```
531+
532+
3. Generate large data (4 table files, the largest is 1e9 rows)
533+
```bash
534+
./bench.sh data h2o_big_join
535+
```
536+
537+
### Run h2o benchmarks
538+
There are three options for running h2o benchmarks: `small`, `medium`, and `big`.
539+
1. Run small data benchmark
540+
```bash
541+
./bench.sh run h2o_small_join
542+
```
543+
544+
2. Run medium data benchmark
545+
```bash
546+
./bench.sh run h2o_medium_join
547+
```
548+
549+
3. Run large data benchmark
550+
```bash
551+
./bench.sh run h2o_big_join
552+
```
553+
554+
4. Run a specific query with a specific join data paths, the data paths are including 4 table files.
555+
556+
For example, to run query 1 with the small data generated above:
557+
```bash
558+
cargo run --release --bin dfbench -- h2o --join-paths ./benchmarks/data/h2o/J1_1e7_NA_0.csv,./benchmarks/data/h2o/J1_1e7_1e1_0.csv,./benchmarks/data/h2o/J1_1e7_1e4_0.csv,./benchmarks/data/h2o/J1_1e7_1e7_NA.csv --queries-path ./benchmarks/queries/h2o/join.sql --query 1
559+
```
516560
[1]: http://www.tpc.org/tpch/
517561
[2]: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

benchmarks/bench.sh

+157-5
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,12 @@ clickbench_1: ClickBench queries against a single parquet file
8181
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
8282
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
8383
external_aggr: External aggregation benchmark
84-
h2o_small: h2oai benchmark with small dataset (1e7 rows), default file format is csv
85-
h2o_medium: h2oai benchmark with medium dataset (1e8 rows), default file format is csv
86-
h2o_big: h2oai benchmark with large dataset (1e9 rows), default file format is csv
84+
h2o_small: h2oai benchmark with small dataset (1e7 rows) for groupby, default file format is csv
85+
h2o_medium: h2oai benchmark with medium dataset (1e8 rows) for groupby, default file format is csv
86+
h2o_big: h2oai benchmark with large dataset (1e9 rows) for groupby, default file format is csv
87+
h2o_small_join: h2oai benchmark with small dataset (1e7 rows) for join, default file format is csv
88+
h2o_medium_join: h2oai benchmark with medium dataset (1e8 rows) for join, default file format is csv
89+
h2o_big_join: h2oai benchmark with large dataset (1e9 rows) for join, default file format is csv
8790
imdb: Join Order Benchmark (JOB) using the IMDB dataset converted to parquet
8891
8992
**********
@@ -150,6 +153,9 @@ main() {
150153
data_h2o "SMALL"
151154
data_h2o "MEDIUM"
152155
data_h2o "BIG"
156+
data_h2o_join "SMALL"
157+
data_h2o_join "MEDIUM"
158+
data_h2o_join "BIG"
153159
data_clickbench_1
154160
data_clickbench_partitioned
155161
data_imdb
@@ -189,6 +195,15 @@ main() {
189195
h2o_big)
190196
data_h2o "BIG" "CSV"
191197
;;
198+
h2o_small_join)
199+
data_h2o_join "SMALL" "CSV"
200+
;;
201+
h2o_medium_join)
202+
data_h2o_join "MEDIUM" "CSV"
203+
;;
204+
h2o_big_join)
205+
data_h2o_join "BIG" "CSV"
206+
;;
192207
external_aggr)
193208
# same data as for tpch
194209
data_tpch "1"
@@ -242,6 +257,9 @@ main() {
242257
run_h2o "SMALL" "PARQUET" "groupby"
243258
run_h2o "MEDIUM" "PARQUET" "groupby"
244259
run_h2o "BIG" "PARQUET" "groupby"
260+
run_h2o_join "SMALL" "PARQUET" "join"
261+
run_h2o_join "MEDIUM" "PARQUET" "join"
262+
run_h2o_join "BIG" "PARQUET" "join"
245263
run_imdb
246264
run_external_aggr
247265
;;
@@ -287,6 +305,15 @@ main() {
287305
h2o_big)
288306
run_h2o "BIG" "CSV" "groupby"
289307
;;
308+
h2o_small_join)
309+
run_h2o_join "SMALL" "CSV" "join"
310+
;;
311+
h2o_medium_join)
312+
run_h2o_join "MEDIUM" "CSV" "join"
313+
;;
314+
h2o_big_join)
315+
run_h2o_join "BIG" "CSV" "join"
316+
;;
290317
external_aggr)
291318
run_external_aggr
292319
;;
@@ -687,7 +714,82 @@ data_h2o() {
687714
deactivate
688715
}
689716

690-
## todo now only support groupby, after https://github.com/mrpowers-io/falsa/issues/21 done, we can add support for join
717+
data_h2o_join() {
718+
# Default values for size and data format
719+
SIZE=${1:-"SMALL"}
720+
DATA_FORMAT=${2:-"CSV"}
721+
722+
# Function to compare Python versions
723+
version_ge() {
724+
[ "$(printf '%s\n' "$1" "$2" | sort -V | head -n1)" = "$2" ]
725+
}
726+
727+
export PYO3_USE_ABI3_FORWARD_COMPATIBILITY=1
728+
729+
# Find the highest available Python version (3.10 or higher)
730+
REQUIRED_VERSION="3.10"
731+
PYTHON_CMD=$(command -v python3 || true)
732+
733+
if [ -n "$PYTHON_CMD" ]; then
734+
PYTHON_VERSION=$($PYTHON_CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
735+
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
736+
echo "Found Python version $PYTHON_VERSION, which is suitable."
737+
else
738+
echo "Python version $PYTHON_VERSION found, but version $REQUIRED_VERSION or higher is required."
739+
PYTHON_CMD=""
740+
fi
741+
fi
742+
743+
# Search for suitable Python versions if the default is unsuitable
744+
if [ -z "$PYTHON_CMD" ]; then
745+
# Loop through all available Python3 commands on the system
746+
for CMD in $(compgen -c | grep -E '^python3(\.[0-9]+)?$'); do
747+
if command -v "$CMD" &> /dev/null; then
748+
PYTHON_VERSION=$($CMD -c "import sys; print(f'{sys.version_info.major}.{sys.version_info.minor}')")
749+
if version_ge "$PYTHON_VERSION" "$REQUIRED_VERSION"; then
750+
PYTHON_CMD="$CMD"
751+
echo "Found suitable Python version: $PYTHON_VERSION ($CMD)"
752+
break
753+
fi
754+
fi
755+
done
756+
fi
757+
758+
# If no suitable Python version found, exit with an error
759+
if [ -z "$PYTHON_CMD" ]; then
760+
echo "Python 3.10 or higher is required. Please install it."
761+
return 1
762+
fi
763+
764+
echo "Using Python command: $PYTHON_CMD"
765+
766+
# Install falsa and other dependencies
767+
echo "Installing falsa..."
768+
769+
# Set virtual environment directory
770+
VIRTUAL_ENV="${PWD}/venv"
771+
772+
# Create a virtual environment using the detected Python command
773+
$PYTHON_CMD -m venv "$VIRTUAL_ENV"
774+
775+
# Activate the virtual environment and install dependencies
776+
source "$VIRTUAL_ENV/bin/activate"
777+
778+
# Ensure 'falsa' is installed (avoid unnecessary reinstall)
779+
pip install --quiet --upgrade falsa
780+
781+
# Create directory if it doesn't exist
782+
H2O_DIR="${DATA_DIR}/h2o"
783+
mkdir -p "${H2O_DIR}"
784+
785+
# Generate h2o test data
786+
echo "Generating h2o test data in ${H2O_DIR} with size=${SIZE} and format=${DATA_FORMAT}"
787+
falsa join --path-prefix="${H2O_DIR}" --size "${SIZE}" --data-format "${DATA_FORMAT}"
788+
789+
# Deactivate virtual environment after completion
790+
deactivate
791+
}
792+
691793
run_h2o() {
692794
# Default values for size and data format
693795
SIZE=${1:-"SMALL"}
@@ -700,7 +802,7 @@ run_h2o() {
700802
RESULTS_FILE="${RESULTS_DIR}/h2o.json"
701803

702804
echo "RESULTS_FILE: ${RESULTS_FILE}"
703-
echo "Running h2o benchmark..."
805+
echo "Running h2o groupby benchmark..."
704806

705807
# Set the file name based on the size
706808
case "$SIZE" in
@@ -730,6 +832,56 @@ run_h2o() {
730832
-o "${RESULTS_FILE}"
731833
}
732834

835+
run_h2o_join() {
836+
# Default values for size and data format
837+
SIZE=${1:-"SMALL"}
838+
DATA_FORMAT=${2:-"CSV"}
839+
DATA_FORMAT=$(echo "$DATA_FORMAT" | tr '[:upper:]' '[:lower:]')
840+
RUN_Type=${3:-"join"}
841+
842+
# Data directory and results file path
843+
H2O_DIR="${DATA_DIR}/h2o"
844+
RESULTS_FILE="${RESULTS_DIR}/h2o_join.json"
845+
846+
echo "RESULTS_FILE: ${RESULTS_FILE}"
847+
echo "Running h2o join benchmark..."
848+
849+
# Set the file name based on the size
850+
case "$SIZE" in
851+
"SMALL")
852+
X_TABLE_FILE_NAME="J1_1e7_NA_0.${DATA_FORMAT}"
853+
SMALL_TABLE_FILE_NAME="J1_1e7_1e1_0.${DATA_FORMAT}"
854+
MEDIUM_TABLE_FILE_NAME="J1_1e7_1e4_0.${DATA_FORMAT}"
855+
LARGE_TABLE_FILE_NAME="J1_1e7_1e7_NA.${DATA_FORMAT}"
856+
;;
857+
"MEDIUM")
858+
X_TABLE_FILE_NAME="J1_1e8_NA_0.${DATA_FORMAT}"
859+
SMALL_TABLE_FILE_NAME="J1_1e8_1e2_0.${DATA_FORMAT}"
860+
MEDIUM_TABLE_FILE_NAME="J1_1e8_1e5_0.${DATA_FORMAT}"
861+
LARGE_TABLE_FILE_NAME="J1_1e8_1e8_NA.${DATA_FORMAT}"
862+
;;
863+
"BIG")
864+
X_TABLE_FILE_NAME="J1_1e9_NA_0.${DATA_FORMAT}"
865+
SMALL_TABLE_FILE_NAME="J1_1e9_1e3_0.${DATA_FORMAT}"
866+
MEDIUM_TABLE_FILE_NAME="J1_1e9_1e6_0.${DATA_FORMAT}"
867+
LARGE_TABLE_FILE_NAME="J1_1e9_1e9_NA.${DATA_FORMAT}"
868+
;;
869+
*)
870+
echo "Invalid size. Valid options are SMALL, MEDIUM, or BIG."
871+
return 1
872+
;;
873+
esac
874+
875+
# Set the query file name based on the RUN_Type
876+
QUERY_FILE="${SCRIPT_DIR}/queries/h2o/${RUN_Type}.sql"
877+
878+
$CARGO_COMMAND --bin dfbench -- h2o \
879+
--iterations 3 \
880+
--join-paths "${H2O_DIR}/${X_TABLE_FILE_NAME},${H2O_DIR}/${SMALL_TABLE_FILE_NAME},${H2O_DIR}/${MEDIUM_TABLE_FILE_NAME},${H2O_DIR}/${LARGE_TABLE_FILE_NAME}" \
881+
--queries-path "${QUERY_FILE}" \
882+
-o "${RESULTS_FILE}"
883+
}
884+
733885
# Runs the external aggregation benchmark
734886
run_external_aggr() {
735887
# Use TPC-H SF1 dataset

benchmarks/src/h2o.rs

+25-2
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ pub struct RunOpt {
5353
)]
5454
path: PathBuf,
5555

56+
/// Path to data files (parquet or csv), using , to separate the paths
57+
/// Default value is the small files for join x table, small table, medium table, big table files in the h2o benchmark
58+
/// This is the small csv file case
59+
#[structopt(
60+
short = "join-paths",
61+
long = "join-paths",
62+
default_value = "benchmarks/data/h2o/J1_1e7_NA_0.csv,benchmarks/data/h2o/J1_1e7_1e1_0.csv,benchmarks/data/h2o/J1_1e7_1e4_0.csv,benchmarks/data/h2o/J1_1e7_1e7_NA.csv"
63+
)]
64+
join_paths: String,
65+
5666
/// If present, write results json here
5767
#[structopt(parse(from_os_str), short = "o", long = "output")]
5868
output_path: Option<PathBuf>,
@@ -71,8 +81,16 @@ impl RunOpt {
7181
let rt_builder = self.common.runtime_env_builder()?;
7282
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
7383

74-
// Register data
75-
self.register_data(&ctx).await?;
84+
if self.queries_path.to_str().unwrap().contains("join") {
85+
let join_paths: Vec<&str> = self.join_paths.split(',').collect();
86+
let table_name: Vec<&str> = vec!["x", "small", "medium", "large"];
87+
for (i, path) in join_paths.iter().enumerate() {
88+
ctx.register_csv(table_name[i], path, Default::default())
89+
.await?;
90+
}
91+
} else if self.queries_path.to_str().unwrap().contains("groupby") {
92+
self.register_data(&ctx).await?;
93+
}
7694

7795
let iterations = self.common.iterations;
7896
let mut benchmark_run = BenchmarkRun::new();
@@ -81,17 +99,22 @@ impl RunOpt {
8199
let sql = queries.get_query(query_id)?;
82100
println!("Q{query_id}: {sql}");
83101

102+
let mut millis = Vec::with_capacity(iterations);
84103
for i in 1..=iterations {
85104
let start = Instant::now();
86105
let results = ctx.sql(sql).await?.collect().await?;
87106
let elapsed = start.elapsed();
88107
let ms = elapsed.as_secs_f64() * 1000.0;
108+
millis.push(ms);
89109
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();
90110
println!(
91111
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
92112
);
93113
benchmark_run.write_iter(elapsed, row_count);
94114
}
115+
let avg = millis.iter().sum::<f64>() / millis.len() as f64;
116+
println!("Query {query_id} avg time: {avg:.2} ms");
117+
95118
if self.common.debug {
96119
ctx.sql(sql).await?.explain(false, false)?.show().await?;
97120
}

0 commit comments

Comments
 (0)