From ef7d94201c56bb5ce0033958d2d5a84cc6cf3c80 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 8 Nov 2024 17:44:36 +0800 Subject: [PATCH 1/3] Add sort integration benchmark --- benchmarks/README.md | 24 ++ benchmarks/bench.sh | 17 ++ benchmarks/src/bin/sort_integration.rs | 334 +++++++++++++++++++++++++ 3 files changed, 375 insertions(+) create mode 100644 benchmarks/src/bin/sort_integration.rs diff --git a/benchmarks/README.md b/benchmarks/README.md index a9aa1afb97a1..a5c37ca3f5fc 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -330,6 +330,30 @@ steps. The tests sort the entire dataset using several different sort orders. +## Sort Integration + +Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark focuses on a single sort executor, this benchmark tests how sorting is executed across multiple CPU cores by benchmarking sorting the whole relational table.) + +Sort integration benchmark runs whole table sort queries on TPCH `lineitem` table, with different characteristics. For example, different number of sort keys, different sort key cardinality, different number of payload columns, etc. + +See [`sort_integration.rs`](src/bin/sort_integration.rs) for more details. + +### Sort Integration Benchmark Example Runs +1. Run all queries with default setting: +```bash + cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' +``` + +2. Run a specific query: +```bash + cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' --query 2 +``` + +3. Run all queries with `bench.sh` script: +```bash +./bench.sh run sort_integration +``` + ## IMDB Run Join Order Benchmark (JOB) on IMDB dataset. diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 47c5d1261605..8a2fc1a08b7d 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -175,6 +175,10 @@ main() { # same data as for tpch data_tpch "1" ;; + sort_integration) + # same data as for tpch + data_tpch "1" + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -252,6 +256,9 @@ main() { external_aggr) run_external_aggr ;; + sort_integration) + run_sort_integration + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" usage @@ -549,6 +556,16 @@ run_external_aggr() { $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" } +# Runs the sort integration benchmark +run_sort_integration() { + TPCH_DIR="${DATA_DIR}/tpch_sf1" + RESULTS_FILE="${RESULTS_DIR}/sort_integration.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running sort integration benchmark..." + + $CARGO_COMMAND --bin sort_integration -- benchmark --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" +} + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" diff --git a/benchmarks/src/bin/sort_integration.rs b/benchmarks/src/bin/sort_integration.rs new file mode 100644 index 000000000000..a0aac4c76ce0 --- /dev/null +++ b/benchmarks/src/bin/sort_integration.rs @@ -0,0 +1,334 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides integration benchmark for sort operation. +//! It will run different sort SQL queries on parquet dataset. +//! +//! Another `Sort` benchmark focus on single core execution. This benchmark +//! runs end-to-end sort queries and test the performance on multiple CPU cores. + +use std::path::PathBuf; +use std::sync::Arc; +use structopt::StructOpt; + +use arrow::record_batch::RecordBatch; +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::Result; +use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::{collect, displayable}; +use datafusion::prelude::*; +use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt}; +use datafusion_common::instant::Instant; +use datafusion_common::DEFAULT_PARQUET_EXTENSION; + +#[derive(Debug, StructOpt)] +#[structopt( + name = "datafusion-sort-integration", + about = "DataFusion sort integration benchmark" +)] +enum SortQueryOpt { + Benchmark(SortConfig), +} + +#[derive(Debug, StructOpt)] +struct SortConfig { + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// Sort query number. If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Path to data files (lineitem). Only parquet format is supported + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + + /// Path to JSON benchmark result to be compare using `compare.py` + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, + + /// Load the data into a MemTable before executing the query + #[structopt(short = "m", long = "mem-table")] + mem_table: bool, +} + +struct QueryResult { + elapsed: std::time::Duration, + row_count: usize, +} + +impl SortConfig { + const SORT_TABLES: [&'static str; 1] = ["lineitem"]; + + /// Sort queries with different characteristics: + /// - Sort key with fixed length or variable length (VARCHAR) + /// - Sort key with different cardinality + /// - Different number of sort keys + /// - Different number of payload columns (thin: 1 additional column other + /// than sort keys; wide: all columns except sort keys) + /// + /// DataSet is `lineitem` table in TPCH dataset (16 columns, 6M rows for + /// scale factor 1.0, cardinality is counted from SF1 dataset) + /// Key Columns: + /// - Column `l_linenumber`, type: `INTEGER`, cardinality: 7 + /// - Column `l_suppkey`, type: `BIGINT`, cardinality: 10k + /// - Column `l_orderkey`, type: `BIGINT`, cardinality: 1.5M + /// - Column `l_comment`, type: `VARCHAR`, cardinality: 4.5M (len is ~26 chars) + /// Payload Columns: + /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column) + /// - Wide variant: all columns except for possible key columns (12 columns) + const SORT_QUERIES: [&'static str; 10] = [ + // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column + r#" + SELECT l_linenumber, l_partkey + FROM lineitem + ORDER BY l_linenumber + "#, + // Q2: 1 sort key (type: BIGINT, cardinality: 1.5M) + 1 payload column + r#" + SELECT l_orderkey, l_partkey + FROM lineitem + ORDER BY l_orderkey + "#, + // Q3: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload column + r#" + SELECT l_comment, l_partkey + FROM lineitem + ORDER BY l_comment + "#, + // Q4: 2 sort keys {(BIGINT, 1.5M), (INTEGER, 7)} + 1 payload column + r#" + SELECT l_orderkey, l_linenumber, l_partkey + FROM lineitem + ORDER BY l_orderkey, l_linenumber + "#, + // Q5: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + no payload column + r#" + SELECT l_linenumber, l_suppkey, l_orderkey + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q6: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 1 payload column + r#" + SELECT l_linenumber, l_suppkey, l_orderkey, l_partkey + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q7: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 12 all other columns + r#" + SELECT l_linenumber, l_suppkey, l_orderkey, + l_partkey, l_quantity, l_extendedprice, l_discount, l_tax, + l_returnflag, l_linestatus, l_shipdate, l_commitdate, + l_receiptdate, l_shipinstruct, l_shipmode + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q8: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + no payload column + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + // Q9: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + 1 payload column + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, l_partkey + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + // Q10: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + 12 all other columns + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, + l_partkey, l_quantity, l_extendedprice, l_discount, l_tax, + l_returnflag, l_linestatus, l_shipdate, l_commitdate, + l_receiptdate, l_shipinstruct, l_shipmode + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + ]; + + /// If query is specified from command line, run only that query. + /// Otherwise, run all queries. + pub async fn run(&self) -> Result<()> { + let mut benchmark_run = BenchmarkRun::new(); + + let query_range = match self.query { + Some(query_id) => query_id..=query_id, + None => 1..=Self::SORT_QUERIES.len(), + }; + + for query_id in query_range { + benchmark_run.start_new_case(&format!("{query_id}")); + + let query_results = self.benchmark_query(query_id).await?; + for iter in query_results { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } + } + + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + + Ok(()) + } + + /// Benchmark query `query_id` in `SORT_QUERIES` + async fn benchmark_query(&self, query_id: usize) -> Result> { + let config = self.common.config(); + + let runtime_config = RuntimeConfig::new().build_arc()?; + let ctx = SessionContext::new_with_config_rt(config, runtime_config); + + // register tables + self.register_tables(&ctx).await?; + + let mut millis = vec![]; + // run benchmark + let mut query_results = vec![]; + for i in 0..self.iterations() { + let start = Instant::now(); + + let query_idx = query_id - 1; // 1-indexed -> 0-indexed + let sql = Self::SORT_QUERIES[query_idx]; + + let result = self.execute_query(&ctx, sql).await?; + + let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; + let ms = elapsed.as_secs_f64() * 1000.0; + millis.push(ms); + + let row_count = result.iter().map(|b| b.num_rows()).sum(); + println!( + "Q{query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" + ); + query_results.push(QueryResult { elapsed, row_count }); + } + + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Q{query_id} avg time: {avg:.2} ms"); + + Ok(query_results) + } + + async fn register_tables(&self, ctx: &SessionContext) -> Result<()> { + for table in Self::SORT_TABLES { + let table_provider = { self.get_table(ctx, table).await? }; + + if self.mem_table { + println!("Loading table '{table}' into memory"); + let start = Instant::now(); + let memtable = + MemTable::load(table_provider, Some(self.partitions()), &ctx.state()) + .await?; + println!( + "Loaded table '{}' into memory in {} ms", + table, + start.elapsed().as_millis() + ); + ctx.register_table(table, Arc::new(memtable))?; + } else { + ctx.register_table(table, table_provider)?; + } + } + Ok(()) + } + + async fn execute_query( + &self, + ctx: &SessionContext, + sql: &str, + ) -> Result> { + let debug = self.common.debug; + let plan = ctx.sql(sql).await?; + let (state, plan) = plan.into_parts(); + + if debug { + println!("=== Logical plan ===\n{plan}\n"); + } + + let plan = state.optimize(&plan)?; + if debug { + println!("=== Optimized logical plan ===\n{plan}\n"); + } + let physical_plan = state.create_physical_plan(&plan).await?; + if debug { + println!( + "=== Physical plan ===\n{}\n", + displayable(physical_plan.as_ref()).indent(true) + ); + } + let result = collect(physical_plan.clone(), state.task_ctx()).await?; + if debug { + println!( + "=== Physical plan with metrics ===\n{}\n", + DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()) + .indent(true) + ); + } + Ok(result) + } + + async fn get_table( + &self, + ctx: &SessionContext, + table: &str, + ) -> Result> { + let path = self.path.to_str().unwrap(); + + // Obtain a snapshot of the SessionState + let state = ctx.state(); + let path = format!("{path}/{table}"); + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let extension = DEFAULT_PARQUET_EXTENSION; + + let options = ListingOptions::new(format) + .with_file_extension(extension) + .with_collect_stat(state.config().collect_statistics()); + + let table_path = ListingTableUrl::parse(path)?; + let config = ListingTableConfig::new(table_path).with_listing_options(options); + let config = config.infer_schema(&state).await?; + + Ok(Arc::new(ListingTable::try_new(config)?)) + } + + fn iterations(&self) -> usize { + self.common.iterations + } + + fn partitions(&self) -> usize { + self.common.partitions.unwrap_or(num_cpus::get()) + } +} + +#[tokio::main] +pub async fn main() -> Result<()> { + env_logger::init(); + + match SortQueryOpt::from_args() { + SortQueryOpt::Benchmark(opt) => opt.run().await?, + } + + Ok(()) +} From 199bdaef44dc7454818f253702141a9884124260 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 8 Nov 2024 19:12:15 +0800 Subject: [PATCH 2/3] clippy --- benchmarks/src/bin/sort_integration.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/benchmarks/src/bin/sort_integration.rs b/benchmarks/src/bin/sort_integration.rs index a0aac4c76ce0..540c1914958b 100644 --- a/benchmarks/src/bin/sort_integration.rs +++ b/benchmarks/src/bin/sort_integration.rs @@ -85,15 +85,17 @@ impl SortConfig { /// - Sort key with different cardinality /// - Different number of sort keys /// - Different number of payload columns (thin: 1 additional column other - /// than sort keys; wide: all columns except sort keys) + /// than sort keys; wide: all columns except sort keys) /// /// DataSet is `lineitem` table in TPCH dataset (16 columns, 6M rows for /// scale factor 1.0, cardinality is counted from SF1 dataset) + /// /// Key Columns: /// - Column `l_linenumber`, type: `INTEGER`, cardinality: 7 /// - Column `l_suppkey`, type: `BIGINT`, cardinality: 10k /// - Column `l_orderkey`, type: `BIGINT`, cardinality: 1.5M /// - Column `l_comment`, type: `VARCHAR`, cardinality: 4.5M (len is ~26 chars) + /// /// Payload Columns: /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column) /// - Wide variant: all columns except for possible key columns (12 columns) From 349ba55221fae251a78fc6debe5ffc4ad9b84d88 Mon Sep 17 00:00:00 2001 From: Yongting You <2010youy01@gmail.com> Date: Fri, 15 Nov 2024 12:46:18 +0800 Subject: [PATCH 3/3] review --- benchmarks/README.md | 12 ++--- benchmarks/bench.sh | 15 +++--- benchmarks/src/bin/dfbench.rs | 4 +- benchmarks/src/lib.rs | 1 + .../{bin/sort_integration.rs => sort_tpch.rs} | 52 +++++++------------ 5 files changed, 36 insertions(+), 48 deletions(-) rename benchmarks/src/{bin/sort_integration.rs => sort_tpch.rs} (91%) diff --git a/benchmarks/README.md b/benchmarks/README.md index a5c37ca3f5fc..cccd7f44f504 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -330,28 +330,28 @@ steps. The tests sort the entire dataset using several different sort orders. -## Sort Integration +## Sort TPCH Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark focuses on a single sort executor, this benchmark tests how sorting is executed across multiple CPU cores by benchmarking sorting the whole relational table.) Sort integration benchmark runs whole table sort queries on TPCH `lineitem` table, with different characteristics. For example, different number of sort keys, different sort key cardinality, different number of payload columns, etc. -See [`sort_integration.rs`](src/bin/sort_integration.rs) for more details. +See [`sort_tpch.rs`](src/sort_tpch.rs) for more details. -### Sort Integration Benchmark Example Runs +### Sort TPCH Benchmark Example Runs 1. Run all queries with default setting: ```bash - cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' + cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' ``` 2. Run a specific query: ```bash - cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' --query 2 + cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2 ``` 3. Run all queries with `bench.sh` script: ```bash -./bench.sh run sort_integration +./bench.sh run sort_tpch ``` ## IMDB diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 8a2fc1a08b7d..b02bfee2454e 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -75,6 +75,7 @@ tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory parquet: Benchmark of parquet reader's filtering speed sort: Benchmark of sorting speed +sort_tpch: Benchmark of sorting speed for end-to-end sort queries on TPCH dataset clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) @@ -175,7 +176,7 @@ main() { # same data as for tpch data_tpch "1" ;; - sort_integration) + sort_tpch) # same data as for tpch data_tpch "1" ;; @@ -256,8 +257,8 @@ main() { external_aggr) run_external_aggr ;; - sort_integration) - run_sort_integration + sort_tpch) + run_sort_tpch ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" @@ -557,13 +558,13 @@ run_external_aggr() { } # Runs the sort integration benchmark -run_sort_integration() { +run_sort_tpch() { TPCH_DIR="${DATA_DIR}/tpch_sf1" - RESULTS_FILE="${RESULTS_DIR}/sort_integration.json" + RESULTS_FILE="${RESULTS_DIR}/sort_tpch.json" echo "RESULTS_FILE: ${RESULTS_FILE}" - echo "Running sort integration benchmark..." + echo "Running sort tpch benchmark..." - $CARGO_COMMAND --bin sort_integration -- benchmark --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" + $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" } diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index f7b84116e793..81aa5437dd5f 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, tpch}; +use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -43,6 +43,7 @@ enum Options { Clickbench(clickbench::RunOpt), ParquetFilter(parquet_filter::RunOpt), Sort(sort::RunOpt), + SortTpch(sort_tpch::RunOpt), Imdb(imdb::RunOpt), } @@ -57,6 +58,7 @@ pub async fn main() -> Result<()> { Options::Clickbench(opt) => opt.run().await, Options::ParquetFilter(opt) => opt.run().await, Options::Sort(opt) => opt.run().await, + Options::SortTpch(opt) => opt.run().await, Options::Imdb(opt) => opt.run().await, } } diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index 02410e0cfa01..2d37d78764d7 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -20,5 +20,6 @@ pub mod clickbench; pub mod imdb; pub mod parquet_filter; pub mod sort; +pub mod sort_tpch; pub mod tpch; pub mod util; diff --git a/benchmarks/src/bin/sort_integration.rs b/benchmarks/src/sort_tpch.rs similarity index 91% rename from benchmarks/src/bin/sort_integration.rs rename to benchmarks/src/sort_tpch.rs index 540c1914958b..4b83b3b8889a 100644 --- a/benchmarks/src/bin/sort_integration.rs +++ b/benchmarks/src/sort_tpch.rs @@ -16,16 +16,16 @@ // under the License. //! This module provides integration benchmark for sort operation. -//! It will run different sort SQL queries on parquet dataset. +//! It will run different sort SQL queries on TPCH `lineitem` parquet dataset. //! //! Another `Sort` benchmark focus on single core execution. This benchmark //! runs end-to-end sort queries and test the performance on multiple CPU cores. +use futures::StreamExt; use std::path::PathBuf; use std::sync::Arc; use structopt::StructOpt; -use arrow::record_batch::RecordBatch; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, @@ -34,23 +34,15 @@ use datafusion::datasource::{MemTable, TableProvider}; use datafusion::error::Result; use datafusion::execution::runtime_env::RuntimeConfig; use datafusion::physical_plan::display::DisplayableExecutionPlan; -use datafusion::physical_plan::{collect, displayable}; +use datafusion::physical_plan::{displayable, execute_stream}; use datafusion::prelude::*; -use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt}; use datafusion_common::instant::Instant; use datafusion_common::DEFAULT_PARQUET_EXTENSION; -#[derive(Debug, StructOpt)] -#[structopt( - name = "datafusion-sort-integration", - about = "DataFusion sort integration benchmark" -)] -enum SortQueryOpt { - Benchmark(SortConfig), -} +use crate::util::{BenchmarkRun, CommonOpt}; #[derive(Debug, StructOpt)] -struct SortConfig { +pub struct RunOpt { /// Common options #[structopt(flatten)] common: CommonOpt, @@ -77,7 +69,7 @@ struct QueryResult { row_count: usize, } -impl SortConfig { +impl RunOpt { const SORT_TABLES: [&'static str; 1] = ["lineitem"]; /// Sort queries with different characteristics: @@ -211,13 +203,12 @@ impl SortConfig { let query_idx = query_id - 1; // 1-indexed -> 0-indexed let sql = Self::SORT_QUERIES[query_idx]; - let result = self.execute_query(&ctx, sql).await?; + let row_count = self.execute_query(&ctx, sql).await?; let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; let ms = elapsed.as_secs_f64() * 1000.0; millis.push(ms); - let row_count = result.iter().map(|b| b.num_rows()).sum(); println!( "Q{query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" ); @@ -253,11 +244,7 @@ impl SortConfig { Ok(()) } - async fn execute_query( - &self, - ctx: &SessionContext, - sql: &str, - ) -> Result> { + async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result { let debug = self.common.debug; let plan = ctx.sql(sql).await?; let (state, plan) = plan.into_parts(); @@ -277,7 +264,14 @@ impl SortConfig { displayable(physical_plan.as_ref()).indent(true) ); } - let result = collect(physical_plan.clone(), state.task_ctx()).await?; + + let mut row_count = 0; + + let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?; + while let Some(batch) = stream.next().await { + row_count += batch.unwrap().num_rows(); + } + if debug { println!( "=== Physical plan with metrics ===\n{}\n", @@ -285,7 +279,8 @@ impl SortConfig { .indent(true) ); } - Ok(result) + + Ok(row_count) } async fn get_table( @@ -323,14 +318,3 @@ impl SortConfig { self.common.partitions.unwrap_or(num_cpus::get()) } } - -#[tokio::main] -pub async fn main() -> Result<()> { - env_logger::init(); - - match SortQueryOpt::from_args() { - SortQueryOpt::Benchmark(opt) => opt.run().await?, - } - - Ok(()) -}