Skip to content

Commit

Permalink
review
Browse files Browse the repository at this point in the history
  • Loading branch information
2010YOUY01 committed Nov 15, 2024
1 parent 199bdae commit 349ba55
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 48 deletions.
12 changes: 6 additions & 6 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,28 +330,28 @@ steps.
The tests sort the entire dataset using several different sort
orders.

## Sort Integration
## Sort TPCH

Test performance of end-to-end sort SQL queries. (While the `Sort` benchmark focuses on a single sort executor, this benchmark tests how sorting is executed across multiple CPU cores by benchmarking sorting the whole relational table.)

Sort integration benchmark runs whole table sort queries on TPCH `lineitem` table, with different characteristics. For example, different number of sort keys, different sort key cardinality, different number of payload columns, etc.

See [`sort_integration.rs`](src/bin/sort_integration.rs) for more details.
See [`sort_tpch.rs`](src/sort_tpch.rs) for more details.

### Sort Integration Benchmark Example Runs
### Sort TPCH Benchmark Example Runs
1. Run all queries with default setting:
```bash
cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json'
cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json'
```

2. Run a specific query:
```bash
cargo run --release --bin sort_integration -- benchmark -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_integration.json' --query 2
cargo run --release --bin dfbench -- sort-tpch -p '....../datafusion/benchmarks/data/tpch_sf1' -o '/tmp/sort_tpch.json' --query 2
```

3. Run all queries with `bench.sh` script:
```bash
./bench.sh run sort_integration
./bench.sh run sort_tpch
```

## IMDB
Expand Down
15 changes: 8 additions & 7 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ tpch10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB),
tpch_mem10: TPCH inspired benchmark on Scale Factor (SF) 10 (~10GB), query from memory
parquet: Benchmark of parquet reader's filtering speed
sort: Benchmark of sorting speed
sort_tpch: Benchmark of sorting speed for end-to-end sort queries on TPCH dataset
clickbench_1: ClickBench queries against a single parquet file
clickbench_partitioned: ClickBench queries against a partitioned (100 files) parquet
clickbench_extended: ClickBench \"inspired\" queries against a single parquet (DataFusion specific)
Expand Down Expand Up @@ -175,7 +176,7 @@ main() {
# same data as for tpch
data_tpch "1"
;;
sort_integration)
sort_tpch)
# same data as for tpch
data_tpch "1"
;;
Expand Down Expand Up @@ -256,8 +257,8 @@ main() {
external_aggr)
run_external_aggr
;;
sort_integration)
run_sort_integration
sort_tpch)
run_sort_tpch
;;
*)
echo "Error: unknown benchmark '$BENCHMARK' for run"
Expand Down Expand Up @@ -557,13 +558,13 @@ run_external_aggr() {
}

# Runs the sort integration benchmark
run_sort_integration() {
run_sort_tpch() {
TPCH_DIR="${DATA_DIR}/tpch_sf1"
RESULTS_FILE="${RESULTS_DIR}/sort_integration.json"
RESULTS_FILE="${RESULTS_DIR}/sort_tpch.json"
echo "RESULTS_FILE: ${RESULTS_FILE}"
echo "Running sort integration benchmark..."
echo "Running sort tpch benchmark..."

$CARGO_COMMAND --bin sort_integration -- benchmark --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
$CARGO_COMMAND --bin dfbench -- sort-tpch --iterations 5 --path "${TPCH_DIR}" -o "${RESULTS_FILE}"
}


Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
#[global_allocator]
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, tpch};
use datafusion_benchmarks::{clickbench, imdb, parquet_filter, sort, sort_tpch, tpch};

#[derive(Debug, StructOpt)]
#[structopt(about = "benchmark command")]
Expand All @@ -43,6 +43,7 @@ enum Options {
Clickbench(clickbench::RunOpt),
ParquetFilter(parquet_filter::RunOpt),
Sort(sort::RunOpt),
SortTpch(sort_tpch::RunOpt),
Imdb(imdb::RunOpt),
}

Expand All @@ -57,6 +58,7 @@ pub async fn main() -> Result<()> {
Options::Clickbench(opt) => opt.run().await,
Options::ParquetFilter(opt) => opt.run().await,
Options::Sort(opt) => opt.run().await,
Options::SortTpch(opt) => opt.run().await,
Options::Imdb(opt) => opt.run().await,
}
}
1 change: 1 addition & 0 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ pub mod clickbench;
pub mod imdb;
pub mod parquet_filter;
pub mod sort;
pub mod sort_tpch;
pub mod tpch;
pub mod util;
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@
// under the License.

//! This module provides integration benchmark for sort operation.
//! It will run different sort SQL queries on parquet dataset.
//! It will run different sort SQL queries on TPCH `lineitem` parquet dataset.
//!
//! Another `Sort` benchmark focus on single core execution. This benchmark
//! runs end-to-end sort queries and test the performance on multiple CPU cores.

use futures::StreamExt;
use std::path::PathBuf;
use std::sync::Arc;
use structopt::StructOpt;

use arrow::record_batch::RecordBatch;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
Expand All @@ -34,23 +34,15 @@ use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::execution::runtime_env::RuntimeConfig;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion::physical_plan::{displayable, execute_stream};
use datafusion::prelude::*;
use datafusion_benchmarks::util::{BenchmarkRun, CommonOpt};
use datafusion_common::instant::Instant;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;

#[derive(Debug, StructOpt)]
#[structopt(
name = "datafusion-sort-integration",
about = "DataFusion sort integration benchmark"
)]
enum SortQueryOpt {
Benchmark(SortConfig),
}
use crate::util::{BenchmarkRun, CommonOpt};

#[derive(Debug, StructOpt)]
struct SortConfig {
pub struct RunOpt {
/// Common options
#[structopt(flatten)]
common: CommonOpt,
Expand All @@ -77,7 +69,7 @@ struct QueryResult {
row_count: usize,
}

impl SortConfig {
impl RunOpt {
const SORT_TABLES: [&'static str; 1] = ["lineitem"];

/// Sort queries with different characteristics:
Expand Down Expand Up @@ -211,13 +203,12 @@ impl SortConfig {
let query_idx = query_id - 1; // 1-indexed -> 0-indexed
let sql = Self::SORT_QUERIES[query_idx];

let result = self.execute_query(&ctx, sql).await?;
let row_count = self.execute_query(&ctx, sql).await?;

let elapsed = start.elapsed(); //.as_secs_f64() * 1000.0;
let ms = elapsed.as_secs_f64() * 1000.0;
millis.push(ms);

let row_count = result.iter().map(|b| b.num_rows()).sum();
println!(
"Q{query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);
Expand Down Expand Up @@ -253,11 +244,7 @@ impl SortConfig {
Ok(())
}

async fn execute_query(
&self,
ctx: &SessionContext,
sql: &str,
) -> Result<Vec<RecordBatch>> {
async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result<usize> {
let debug = self.common.debug;
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();
Expand All @@ -277,15 +264,23 @@ impl SortConfig {
displayable(physical_plan.as_ref()).indent(true)
);
}
let result = collect(physical_plan.clone(), state.task_ctx()).await?;

let mut row_count = 0;

let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?;
while let Some(batch) = stream.next().await {
row_count += batch.unwrap().num_rows();
}

if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent(true)
);
}
Ok(result)

Ok(row_count)
}

async fn get_table(
Expand Down Expand Up @@ -323,14 +318,3 @@ impl SortConfig {
self.common.partitions.unwrap_or(num_cpus::get())
}
}

#[tokio::main]
pub async fn main() -> Result<()> {
env_logger::init();

match SortQueryOpt::from_args() {
SortQueryOpt::Benchmark(opt) => opt.run().await?,
}

Ok(())
}

0 comments on commit 349ba55

Please sign in to comment.