Skip to content

Commit d121405

Browse files
committed
Renaming API to be more consistent with struct value
1 parent 879987b commit d121405

File tree

8 files changed

+29
-41
lines changed

8 files changed

+29
-41
lines changed

datafusion/common/src/config.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,9 @@ config_namespace! {
525525
/// will be collected into a single partition
526526
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
527527

528-
/// The default filter selectivity used by Filter statistics
529-
/// when an exact selectivity cannot be determined
528+
/// The default filter selectivity used by Filter Statistics
529+
/// when an exact selectivity cannot be determined. Valid values are
530+
/// between 0 (no selectivity) and 100 (all rows are selected).
530531
pub default_filter_selectivity: u8, default = 20
531532
}
532533
}

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -302,16 +302,12 @@ fn try_swapping_with_filter(
302302
return Ok(None);
303303
};
304304

305-
FilterExec::try_new(
306-
new_predicate,
307-
make_with_child(projection, filter.input())?,
308-
).and_then(
309-
|e| {
305+
FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
306+
.and_then(|e| {
310307
let selectivity = filter.default_selectivity();
311-
e.with_selectivity(selectivity)
312-
}
313-
)
314-
.map(|e| Some(Arc::new(e) as _))
308+
e.with_default_selectivity(selectivity)
309+
})
310+
.map(|e| Some(Arc::new(e) as _))
315311
}
316312

317313
/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,

datafusion/core/src/physical_planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -910,7 +910,7 @@ impl DefaultPhysicalPlanner {
910910
)?;
911911
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
912912
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
913-
Ok(Arc::new(filter.with_selectivity(selectivity)?))
913+
Ok(Arc::new(filter.with_default_selectivity(selectivity)?))
914914
}
915915
LogicalPlan::Union(Union { inputs, schema }) => {
916916
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

datafusion/core/src/test_util/parquet.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,8 +179,7 @@ impl TestParquetFile {
179179
None,
180180
));
181181

182-
let exec =
183-
Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
182+
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
184183
Ok(exec)
185184
} else {
186185
Ok(Arc::new(ParquetExec::new(scan_config, None, None)))

datafusion/physical-plan/src/filter.rs

Lines changed: 12 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,10 @@ impl FilterExec {
8585
}
8686
}
8787

88-
pub fn with_selectivity(mut self, default_selectivity: u8) -> Result<Self, DataFusionError>{
88+
pub fn with_default_selectivity(
89+
mut self,
90+
default_selectivity: u8,
91+
) -> Result<Self, DataFusionError> {
8992
if default_selectivity > 100 {
9093
return plan_err!("Default flter selectivity needs to be less than 100");
9194
}
@@ -182,16 +185,12 @@ impl ExecutionPlan for FilterExec {
182185
self: Arc<Self>,
183186
mut children: Vec<Arc<dyn ExecutionPlan>>,
184187
) -> Result<Arc<dyn ExecutionPlan>> {
185-
FilterExec::try_new(
186-
self.predicate.clone(),
187-
children.swap_remove(0),
188-
).and_then(
189-
|e| {
188+
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
189+
.and_then(|e| {
190190
let selectivity = e.default_selectivity();
191-
e.with_selectivity(selectivity)
192-
}
193-
)
194-
.map(|e| Arc::new(e) as _)
191+
e.with_default_selectivity(selectivity)
192+
})
193+
.map(|e| Arc::new(e) as _)
195194
}
196195

197196
fn execute(
@@ -221,15 +220,7 @@ impl ExecutionPlan for FilterExec {
221220
let input_stats = self.input.statistics()?;
222221
let schema = self.schema();
223222
if !check_support(predicate, &schema) {
224-
let selectivity = self.default_selectivity as f32 / 100.0;
225-
let mut stats = input_stats.clone().into_inexact();
226-
if let Precision::Inexact(n) = stats.num_rows {
227-
stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize);
228-
}
229-
if let Precision::Inexact(n) = stats.total_byte_size {
230-
stats.total_byte_size =
231-
Precision::Inexact((selectivity * n as f32) as usize);
232-
}
223+
let selectivity = self.default_selectivity as f64 / 100.0;
233224
let mut stats = input_stats.into_inexact();
234225
stats.num_rows = stats.num_rows.with_estimated_selectivity(selectivity);
235226
stats.total_byte_size = stats
@@ -1025,7 +1016,7 @@ mod tests {
10251016
}
10261017

10271018
#[tokio::test]
1028-
async fn test_validation_filter_selectivity() -> Result<()>{
1019+
async fn test_validation_filter_selectivity() -> Result<()> {
10291020
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
10301021
let input = Arc::new(StatisticsExec::new(
10311022
Statistics::new_unknown(&schema),
@@ -1038,7 +1029,7 @@ mod tests {
10381029
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
10391030
));
10401031
let filter = FilterExec::try_new(predicate, input)?;
1041-
assert!(filter.with_selectivity(120).is_err());
1032+
assert!(filter.with_default_selectivity(120).is_err());
10421033
Ok(())
10431034
}
10441035
}

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,13 @@ impl AsExecutionPlan for PhysicalPlanNode {
162162
let filter_selectivity = filter.default_filter_selectivity.try_into();
163163
let filter = FilterExec::try_new(predicate, input)?;
164164
match filter_selectivity {
165-
Ok(filter_selectivity) => {
166-
Ok(Arc::new(filter.with_selectivity(filter_selectivity)?))
167-
}
165+
Ok(filter_selectivity) => Ok(Arc::new(
166+
filter.with_default_selectivity(filter_selectivity)?,
167+
)),
168168
Err(_) => Err(DataFusionError::Internal(
169169
"filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
170170
)),
171171
}
172-
173172
}
174173
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
175174
parse_protobuf_file_scan_config(

datafusion/sqllogictest/test_files/information_schema.slt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ datafusion.explain.logical_plan_only false
188188
datafusion.explain.physical_plan_only false
189189
datafusion.explain.show_statistics false
190190
datafusion.optimizer.allow_symmetric_joins_without_pruning true
191+
datafusion.optimizer.default_filter_selectivity 20
191192
datafusion.optimizer.enable_distinct_aggregation_soft_limit true
192193
datafusion.optimizer.enable_round_robin_repartition true
193194
datafusion.optimizer.enable_topk_aggregation true
@@ -261,6 +262,7 @@ datafusion.explain.logical_plan_only false When set to true, the explain stateme
261262
datafusion.explain.physical_plan_only false When set to true, the explain statement will only print physical plans
262263
datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans
263264
datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors.
265+
datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter statistics when an exact selectivity cannot be determined
264266
datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read.
265267
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
266268
datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible

docs/source/user-guide/configs.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus
9898
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
9999
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
100100
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory |
101-
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
102-
| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter
101+
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
102+
| datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). |
103103
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
104104
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
105105
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |

0 commit comments

Comments
 (0)