Skip to content

Commit d54c9ca

Browse files
committed
Merge branch 'main' into format-params
2 parents 1454ade + 1fe856b commit d54c9ca

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1375
-322
lines changed

Cargo.lock

Lines changed: 13 additions & 12 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

benchmarks/README.md

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,25 @@ To run for specific query, for example Q21
8383
./bench.sh run tpch10 21
8484
```
8585

86-
## Select join algorithm
86+
## Benchmark with modified configurations
87+
### Select join algorithm
8788
The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm.
8889
To run TPCH benchmarks with join other than HASH:
8990
```shell
9091
PREFER_HASH_JOIN=false ./bench.sh run tpch
9192
```
9293

94+
### Configure with environment variables
95+
Any [datafusion options](https://datafusion.apache.org/user-guide/configs.html) that are provided environment variables are
96+
also considered by the benchmarks.
97+
The following configuration runs the TPCH benchmark with datafusion configured to *not* repartition join keys.
98+
```shell
99+
DATAFUSION_OPTIMIZER_REPARTITION_JOINS=false ./bench.sh run tpch
100+
```
101+
You might want to adjust the results location to avoid overwriting previous results.
102+
Environment configuration that was picked up by datafusion is logged at `info` level.
103+
To verify that datafusion picked up your configuration, run the benchmarks with `RUST_LOG=info` or higher.
104+
93105
## Comparing performance of main and a branch
94106

95107
```shell

benchmarks/bench.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
9898
RESULTS_NAME folder where the benchmark files are stored
9999
PREFER_HASH_JOIN Prefer hash join algorithm (default true)
100100
VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by <your-venv>/bin/activate)
101+
DATAFUSION_* Set the given datafusion configuration
101102
"
102103
exit 1
103104
}

benchmarks/src/bin/external_aggr.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ impl ExternalAggrConfig {
189189
) -> Result<Vec<QueryResult>> {
190190
let query_name =
191191
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
192-
let config = self.common.config();
192+
let config = self.common.config()?;
193193
let memory_pool: Arc<dyn MemoryPool> = match mem_pool_type {
194194
"fair" => Arc::new(FairSpillPool::new(mem_limit as usize)),
195195
"greedy" => Arc::new(GreedyMemoryPool::new(mem_limit as usize)),

benchmarks/src/clickbench.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ impl RunOpt {
116116
};
117117

118118
// configure parquet options
119-
let mut config = self.common.config();
119+
let mut config = self.common.config()?;
120120
{
121121
let parquet_options = &mut config.options_mut().execution.parquet;
122122
// The hits_partitioned dataset specifies string columns

benchmarks/src/h2o.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ impl RunOpt {
7777
None => queries.min_query_id()..=queries.max_query_id(),
7878
};
7979

80-
let config = self.common.config();
80+
let config = self.common.config()?;
8181
let rt_builder = self.common.runtime_env_builder()?;
8282
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);
8383

benchmarks/src/imdb/run.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ impl RunOpt {
303303
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
304304
let mut config = self
305305
.common
306-
.config()
306+
.config()?
307307
.with_collect_statistics(!self.disable_statistics);
308308
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
309309
let rt_builder = self.common.runtime_env_builder()?;
@@ -514,7 +514,7 @@ mod tests {
514514
let common = CommonOpt {
515515
iterations: 1,
516516
partitions: Some(2),
517-
batch_size: 8192,
517+
batch_size: Some(8192),
518518
mem_pool_type: "fair".to_string(),
519519
memory_limit: None,
520520
sort_spill_reservation_bytes: None,
@@ -550,7 +550,7 @@ mod tests {
550550
let common = CommonOpt {
551551
iterations: 1,
552552
partitions: Some(2),
553-
batch_size: 8192,
553+
batch_size: Some(8192),
554554
mem_pool_type: "fair".to_string(),
555555
memory_limit: None,
556556
sort_spill_reservation_bytes: None,

benchmarks/src/sort_tpch.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ impl RunOpt {
202202

203203
/// Benchmark query `query_id` in `SORT_QUERIES`
204204
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
205-
let config = self.common.config();
205+
let config = self.common.config()?;
206206
let rt_builder = self.common.runtime_env_builder()?;
207207
let state = SessionStateBuilder::new()
208208
.with_config(config)

benchmarks/src/tpch/run.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl RunOpt {
123123
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
124124
let mut config = self
125125
.common
126-
.config()
126+
.config()?
127127
.with_collect_statistics(!self.disable_statistics);
128128
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
129129
let rt_builder = self.common.runtime_env_builder()?;
@@ -355,7 +355,7 @@ mod tests {
355355
let common = CommonOpt {
356356
iterations: 1,
357357
partitions: Some(2),
358-
batch_size: 8192,
358+
batch_size: Some(8192),
359359
mem_pool_type: "fair".to_string(),
360360
memory_limit: None,
361361
sort_spill_reservation_bytes: None,
@@ -392,7 +392,7 @@ mod tests {
392392
let common = CommonOpt {
393393
iterations: 1,
394394
partitions: Some(2),
395-
batch_size: 8192,
395+
batch_size: Some(8192),
396396
mem_pool_type: "fair".to_string(),
397397
memory_limit: None,
398398
sort_spill_reservation_bytes: None,

benchmarks/src/util/options.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion::{
2525
},
2626
prelude::SessionConfig,
2727
};
28-
use datafusion_common::{utils::get_available_parallelism, DataFusionError, Result};
28+
use datafusion_common::{DataFusionError, Result};
2929
use structopt::StructOpt;
3030

3131
// Common benchmark options (don't use doc comments otherwise this doc
@@ -41,8 +41,8 @@ pub struct CommonOpt {
4141
pub partitions: Option<usize>,
4242

4343
/// Batch size when reading CSV or Parquet files
44-
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
45-
pub batch_size: usize,
44+
#[structopt(short = "s", long = "batch-size")]
45+
pub batch_size: Option<usize>,
4646

4747
/// The memory pool type to use, should be one of "fair" or "greedy"
4848
#[structopt(long = "mem-pool-type", default_value = "fair")]
@@ -65,21 +65,25 @@ pub struct CommonOpt {
6565

6666
impl CommonOpt {
6767
/// Return an appropriately configured `SessionConfig`
68-
pub fn config(&self) -> SessionConfig {
69-
self.update_config(SessionConfig::new())
68+
pub fn config(&self) -> Result<SessionConfig> {
69+
SessionConfig::from_env().map(|config| self.update_config(config))
7070
}
7171

7272
/// Modify the existing config appropriately
73-
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
74-
let mut config = config
75-
.with_target_partitions(
76-
self.partitions.unwrap_or(get_available_parallelism()),
77-
)
78-
.with_batch_size(self.batch_size);
73+
pub fn update_config(&self, mut config: SessionConfig) -> SessionConfig {
74+
if let Some(batch_size) = self.batch_size {
75+
config = config.with_batch_size(batch_size)
76+
}
77+
78+
if let Some(partitions) = self.partitions {
79+
config = config.with_target_partitions(partitions)
80+
}
81+
7982
if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes {
8083
config =
8184
config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);
8285
}
86+
8387
config
8488
}
8589

datafusion/common/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ log = { workspace = true }
6363
object_store = { workspace = true, optional = true }
6464
parquet = { workspace = true, optional = true, default-features = true }
6565
paste = "1.0.15"
66-
pyo3 = { version = "0.24.0", optional = true }
66+
pyo3 = { version = "0.24.2", optional = true }
6767
recursive = { workspace = true, optional = true }
6868
sqlparser = { workspace = true }
6969
tokio = { workspace = true }

datafusion/common/src/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -921,7 +921,9 @@ impl ConfigOptions {
921921
for key in keys.0 {
922922
let env = key.to_uppercase().replace('.', "_");
923923
if let Some(var) = std::env::var_os(env) {
924-
ret.set(&key, var.to_string_lossy().as_ref())?;
924+
let value = var.to_string_lossy();
925+
log::info!("Set {key} to {value} from the environment variable");
926+
ret.set(&key, value.as_ref())?;
925927
}
926928
}
927929

0 commit comments

Comments
 (0)