diff --git a/benchmarks/README.md b/benchmarks/README.md index a9aa1afb97a1..cccd7f44f504 100644 --- a/benchmarks/README.md +++ b/benchmarks/README.md @@ -330,6 +330,30 @@ steps. The tests sort the entire dataset using several different sort orders. +## Sort TPCH + +Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark focuses on a single sort executor, this benchmark tests how sorting is executed across multiple CPU cores by benchmarking sorting the whole relational table.) + +Sort integration benchmark runs whole table sort queries on TPCH `lineitem` table, with different characteristics. For example, different number of sort keys, different sort key cardinality, different number of payload columns, etc. + +See [`sort_tpch.rs`](src/sort_tpch.rs) for more details. + +### Sort TPCH Benchmark Example Runs +1. Run all queries with default setting: +```bash + cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' +``` + +2. Run a specific query: +```bash + cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2 +``` + +3. Run all queries with `bench.sh` script: +```bash +./bench.sh run sort_tpch +``` + ## IMDB Run Join Order Benchmark (JOB) on IMDB dataset. diff --git a/benchmarks/bench.sh b/benchmarks/bench.sh index 47c5d1261605..b02bfee2454e 100755 --- a/benchmarks/bench.sh +++ b/benchmarks/bench.sh @@ -75,6 +75,7 @@ tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory parquet: Benchmark of parquet reader's filtering speed sort: Benchmark of sorting speed +sort_tpch: Benchmark of sorting speed for end-to-end sort queries on TPCH dataset clickbench_1: ClickBench queries against a single parquet file clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific) @@ -175,6 +176,10 @@ main() { # same data as for tpch data_tpch "1" ;; + sort_tpch) + # same data as for tpch + data_tpch "1" + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for data generation" usage @@ -252,6 +257,9 @@ main() { external_aggr) run_external_aggr ;; + sort_tpch) + run_sort_tpch + ;; *) echo "Error: unknown benchmark '$BENCHMARK' for run" usage @@ -549,6 +557,16 @@ run_external_aggr() { $CARGO_COMMAND --bin external_aggr -- benchmark --partitions 4 --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" } +# Runs the sort integration benchmark +run_sort_tpch() { + TPCH_DIR="${DATA_DIR}/tpch_sf1" + RESULTS_FILE="${RESULTS_DIR}/sort_tpch.json" + echo "RESULTS_FILE: ${RESULTS_FILE}" + echo "Running sort tpch benchmark..." + + $CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}" +} + compare_benchmarks() { BASE_RESULTS_DIR="${SCRIPT_DIR}/results" diff --git a/benchmarks/src/bin/dfbench.rs b/benchmarks/src/bin/dfbench.rs index f7b84116e793..81aa5437dd5f 100644 --- a/benchmarks/src/bin/dfbench.rs +++ b/benchmarks/src/bin/dfbench.rs @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc; #[global_allocator] static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; -use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, tpch}; +use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch}; #[derive(Debug, StructOpt)] #[structopt(about = "benchmark command")] @@ -43,6 +43,7 @@ enum Options { Clickbench(clickbench::RunOpt), ParquetFilter(parquet_filter::RunOpt), Sort(sort::RunOpt), + SortTpch(sort_tpch::RunOpt), Imdb(imdb::RunOpt), } @@ -57,6 +58,7 @@ pub async fn main() -> Result<()> { Options::Clickbench(opt) => opt.run().await, Options::ParquetFilter(opt) => opt.run().await, Options::Sort(opt) => opt.run().await, + Options::SortTpch(opt) => opt.run().await, Options::Imdb(opt) => opt.run().await, } } diff --git a/benchmarks/src/lib.rs b/benchmarks/src/lib.rs index 02410e0cfa01..2d37d78764d7 100644 --- a/benchmarks/src/lib.rs +++ b/benchmarks/src/lib.rs @@ -20,5 +20,6 @@ pub mod clickbench; pub mod imdb; pub mod parquet_filter; pub mod sort; +pub mod sort_tpch; pub mod tpch; pub mod util; diff --git a/benchmarks/src/sort_tpch.rs b/benchmarks/src/sort_tpch.rs new file mode 100644 index 000000000000..4b83b3b8889a --- /dev/null +++ b/benchmarks/src/sort_tpch.rs @@ -0,0 +1,320 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module provides integration benchmark for sort operation. +//! It will run different sort SQL queries on TPCH `lineitem` parquet dataset. +//! +//! Another `Sort` benchmark focus on single core execution. This benchmark +//! runs end-to-end sort queries and test the performance on multiple CPU cores. + +use futures::StreamExt; +use std::path::PathBuf; +use std::sync::Arc; +use structopt::StructOpt; + +use datafusion::datasource::file_format::parquet::ParquetFormat; +use datafusion::datasource::listing::{ + ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl, +}; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion::error::Result; +use datafusion::execution::runtime_env::RuntimeConfig; +use datafusion::physical_plan::display::DisplayableExecutionPlan; +use datafusion::physical_plan::{displayable, execute_stream}; +use datafusion::prelude::*; +use datafusion_common::instant::Instant; +use datafusion_common::DEFAULT_PARQUET_EXTENSION; + +use crate::util::{BenchmarkRun, CommonOpt}; + +#[derive(Debug, StructOpt)] +pub struct RunOpt { + /// Common options + #[structopt(flatten)] + common: CommonOpt, + + /// Sort query number. If not specified, runs all queries + #[structopt(short, long)] + query: Option, + + /// Path to data files (lineitem). Only parquet format is supported + #[structopt(parse(from_os_str), required = true, short = "p", long = "path")] + path: PathBuf, + + /// Path to JSON benchmark result to be compare using `compare.py` + #[structopt(parse(from_os_str), short = "o", long = "output")] + output_path: Option, + + /// Load the data into a MemTable before executing the query + #[structopt(short = "m", long = "mem-table")] + mem_table: bool, +} + +struct QueryResult { + elapsed: std::time::Duration, + row_count: usize, +} + +impl RunOpt { + const SORT_TABLES: [&'static str; 1] = ["lineitem"]; + + /// Sort queries with different characteristics: + /// - Sort key with fixed length or variable length (VARCHAR) + /// - Sort key with different cardinality + /// - Different number of sort keys + /// - Different number of payload columns (thin: 1 additional column other + /// than sort keys; wide: all columns except sort keys) + /// + /// DataSet is `lineitem` table in TPCH dataset (16 columns, 6M rows for + /// scale factor 1.0, cardinality is counted from SF1 dataset) + /// + /// Key Columns: + /// - Column `l_linenumber`, type: `INTEGER`, cardinality: 7 + /// - Column `l_suppkey`, type: `BIGINT`, cardinality: 10k + /// - Column `l_orderkey`, type: `BIGINT`, cardinality: 1.5M + /// - Column `l_comment`, type: `VARCHAR`, cardinality: 4.5M (len is ~26 chars) + /// + /// Payload Columns: + /// - Thin variant: `l_partkey` column with `BIGINT` type (1 column) + /// - Wide variant: all columns except for possible key columns (12 columns) + const SORT_QUERIES: [&'static str; 10] = [ + // Q1: 1 sort key (type: INTEGER, cardinality: 7) + 1 payload column + r#" + SELECT l_linenumber, l_partkey + FROM lineitem + ORDER BY l_linenumber + "#, + // Q2: 1 sort key (type: BIGINT, cardinality: 1.5M) + 1 payload column + r#" + SELECT l_orderkey, l_partkey + FROM lineitem + ORDER BY l_orderkey + "#, + // Q3: 1 sort key (type: VARCHAR, cardinality: 4.5M) + 1 payload column + r#" + SELECT l_comment, l_partkey + FROM lineitem + ORDER BY l_comment + "#, + // Q4: 2 sort keys {(BIGINT, 1.5M), (INTEGER, 7)} + 1 payload column + r#" + SELECT l_orderkey, l_linenumber, l_partkey + FROM lineitem + ORDER BY l_orderkey, l_linenumber + "#, + // Q5: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + no payload column + r#" + SELECT l_linenumber, l_suppkey, l_orderkey + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q6: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 1 payload column + r#" + SELECT l_linenumber, l_suppkey, l_orderkey, l_partkey + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q7: 3 sort keys {(INTEGER, 7), (BIGINT, 10k), (BIGINT, 1.5M)} + 12 all other columns + r#" + SELECT l_linenumber, l_suppkey, l_orderkey, + l_partkey, l_quantity, l_extendedprice, l_discount, l_tax, + l_returnflag, l_linestatus, l_shipdate, l_commitdate, + l_receiptdate, l_shipinstruct, l_shipmode + FROM lineitem + ORDER BY l_linenumber, l_suppkey, l_orderkey + "#, + // Q8: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + no payload column + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + // Q9: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + 1 payload column + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, l_partkey + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + // Q10: 4 sort keys {(BIGINT, 1.5M), (BIGINT, 10k), (INTEGER, 7), (VARCHAR, 4.5M)} + 12 all other columns + r#" + SELECT l_orderkey, l_suppkey, l_linenumber, l_comment, + l_partkey, l_quantity, l_extendedprice, l_discount, l_tax, + l_returnflag, l_linestatus, l_shipdate, l_commitdate, + l_receiptdate, l_shipinstruct, l_shipmode + FROM lineitem + ORDER BY l_orderkey, l_suppkey, l_linenumber, l_comment + "#, + ]; + + /// If query is specified from command line, run only that query. + /// Otherwise, run all queries. + pub async fn run(&self) -> Result<()> { + let mut benchmark_run = BenchmarkRun::new(); + + let query_range = match self.query { + Some(query_id) => query_id..=query_id, + None => 1..=Self::SORT_QUERIES.len(), + }; + + for query_id in query_range { + benchmark_run.start_new_case(&format!("{query_id}")); + + let query_results = self.benchmark_query(query_id).await?; + for iter in query_results { + benchmark_run.write_iter(iter.elapsed, iter.row_count); + } + } + + benchmark_run.maybe_write_json(self.output_path.as_ref())?; + + Ok(()) + } + + /// Benchmark query `query_id` in `SORT_QUERIES` + async fn benchmark_query(&self, query_id: usize) -> Result> { + let config = self.common.config(); + + let runtime_config = RuntimeConfig::new().build_arc()?; + let ctx = SessionContext::new_with_config_rt(config, runtime_config); + + // register tables + self.register_tables(&ctx).await?; + + let mut millis = vec![]; + // run benchmark + let mut query_results = vec![]; + for i in 0..self.iterations() { + let start = Instant::now(); + + let query_idx = query_id - 1; // 1-indexed -> 0-indexed + let sql = Self::SORT_QUERIES[query_idx]; + + let row_count = self.execute_query(&ctx, sql).await?; + + let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0; + let ms = elapsed.as_secs_f64() * 1000.0; + millis.push(ms); + + println!( + "Q{query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows" + ); + query_results.push(QueryResult { elapsed, row_count }); + } + + let avg = millis.iter().sum::() / millis.len() as f64; + println!("Q{query_id} avg time: {avg:.2} ms"); + + Ok(query_results) + } + + async fn register_tables(&self, ctx: &SessionContext) -> Result<()> { + for table in Self::SORT_TABLES { + let table_provider = { self.get_table(ctx, table).await? }; + + if self.mem_table { + println!("Loading table '{table}' into memory"); + let start = Instant::now(); + let memtable = + MemTable::load(table_provider, Some(self.partitions()), &ctx.state()) + .await?; + println!( + "Loaded table '{}' into memory in {} ms", + table, + start.elapsed().as_millis() + ); + ctx.register_table(table, Arc::new(memtable))?; + } else { + ctx.register_table(table, table_provider)?; + } + } + Ok(()) + } + + async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result { + let debug = self.common.debug; + let plan = ctx.sql(sql).await?; + let (state, plan) = plan.into_parts(); + + if debug { + println!("=== Logical plan ===\n{plan}\n"); + } + + let plan = state.optimize(&plan)?; + if debug { + println!("=== Optimized logical plan ===\n{plan}\n"); + } + let physical_plan = state.create_physical_plan(&plan).await?; + if debug { + println!( + "=== Physical plan ===\n{}\n", + displayable(physical_plan.as_ref()).indent(true) + ); + } + + let mut row_count = 0; + + let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?; + while let Some(batch) = stream.next().await { + row_count += batch.unwrap().num_rows(); + } + + if debug { + println!( + "=== Physical plan with metrics ===\n{}\n", + DisplayableExecutionPlan::with_metrics(physical_plan.as_ref()) + .indent(true) + ); + } + + Ok(row_count) + } + + async fn get_table( + &self, + ctx: &SessionContext, + table: &str, + ) -> Result> { + let path = self.path.to_str().unwrap(); + + // Obtain a snapshot of the SessionState + let state = ctx.state(); + let path = format!("{path}/{table}"); + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let extension = DEFAULT_PARQUET_EXTENSION; + + let options = ListingOptions::new(format) + .with_file_extension(extension) + .with_collect_stat(state.config().collect_statistics()); + + let table_path = ListingTableUrl::parse(path)?; + let config = ListingTableConfig::new(table_path).with_listing_options(options); + let config = config.infer_schema(&state).await?; + + Ok(Arc::new(ListingTable::try_new(config)?)) + } + + fn iterations(&self) -> usize { + self.common.iterations + } + + fn partitions(&self) -> usize { + self.common.partitions.unwrap_or(num_cpus::get()) + } +}