Skip to content

Commit 77ad090

Browse files
committed
Turning filter selectivity as a configurable parameter
1 parent 4fb4b21 commit 77ad090

File tree

10 files changed

+102
-12
lines changed

10 files changed

+102
-12
lines changed

datafusion/common/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,10 @@ config_namespace! {
524524
/// The maximum estimated size in bytes for one input side of a HashJoin
525525
/// will be collected into a single partition
526526
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
527+
528+
/// The default filter selectivity used by Filter statistics
529+
/// when an exact selectivity cannot be determined
530+
pub default_filter_selectivity: u8, default = 20
527531
}
528532
}
529533

@@ -877,6 +881,7 @@ config_field!(String);
877881
config_field!(bool);
878882
config_field!(usize);
879883
config_field!(f64);
884+
config_field!(u8);
880885
config_field!(u64);
881886

882887
/// An implementation trait used to recursively walk configuration

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,16 @@ fn try_swapping_with_filter(
302302
return Ok(None);
303303
};
304304

305-
FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
306-
.map(|e| Some(Arc::new(e) as _))
305+
FilterExec::try_new(
306+
new_predicate,
307+
make_with_child(projection, filter.input())?,
308+
).and_then(
309+
|e| {
310+
let selectivity = filter.default_selectivity();
311+
e.with_selectivity(selectivity)
312+
}
313+
)
314+
.map(|e| Some(Arc::new(e) as _))
307315
}
308316

309317
/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,

datafusion/core/src/physical_planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,9 @@ impl DefaultPhysicalPlanner {
913913
&input_schema,
914914
session_state,
915915
)?;
916-
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
916+
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
917+
let filter = FilterExec::try_new(runtime_expr, physical_input)?;
918+
Ok(Arc::new(filter.with_selectivity(selectivity)?))
917919
}
918920
LogicalPlan::Union(Union { inputs, schema }) => {
919921
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

datafusion/core/src/test_util/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ impl TestParquetFile {
178178
None,
179179
));
180180

181-
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
181+
let exec =
182+
Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
182183
Ok(exec)
183184
} else {
184185
Ok(Arc::new(ParquetExec::new(scan_config, None, None)))

datafusion/physical-plan/src/filter.rs

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub struct FilterExec {
6262
input: Arc<dyn ExecutionPlan>,
6363
/// Execution metrics
6464
metrics: ExecutionPlanMetricsSet,
65+
/// Selectivity for statistics. 0 = no rows, 100 all rows
66+
default_selectivity: u8,
6567
}
6668

6769
impl FilterExec {
@@ -75,13 +77,22 @@ impl FilterExec {
7577
predicate,
7678
input: input.clone(),
7779
metrics: ExecutionPlanMetricsSet::new(),
80+
default_selectivity: 20,
7881
}),
7982
other => {
8083
plan_err!("Filter predicate must return boolean values, not {other:?}")
8184
}
8285
}
8386
}
8487

88+
pub fn with_selectivity(mut self, default_selectivity: u8) -> Result<Self, DataFusionError>{
89+
if default_selectivity > 100 {
90+
return plan_err!("Default flter selectivity needs to be less than 100");
91+
}
92+
self.default_selectivity = default_selectivity;
93+
Ok(self)
94+
}
95+
8596
/// The expression to filter on. This expression must evaluate to a boolean value.
8697
pub fn predicate(&self) -> &Arc<dyn PhysicalExpr> {
8798
&self.predicate
@@ -91,6 +102,11 @@ impl FilterExec {
91102
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
92103
&self.input
93104
}
105+
106+
/// The default selectivity
107+
pub fn default_selectivity(&self) -> u8 {
108+
self.default_selectivity
109+
}
94110
}
95111

96112
impl DisplayAs for FilterExec {
@@ -166,8 +182,16 @@ impl ExecutionPlan for FilterExec {
166182
self: Arc<Self>,
167183
mut children: Vec<Arc<dyn ExecutionPlan>>,
168184
) -> Result<Arc<dyn ExecutionPlan>> {
169-
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
170-
.map(|e| Arc::new(e) as _)
185+
FilterExec::try_new(
186+
self.predicate.clone(),
187+
children.swap_remove(0),
188+
).and_then(
189+
|e| {
190+
let selectivity = e.default_selectivity();
191+
e.with_selectivity(selectivity)
192+
}
193+
)
194+
.map(|e| Arc::new(e) as _)
171195
}
172196

173197
fn execute(
@@ -197,10 +221,7 @@ impl ExecutionPlan for FilterExec {
197221
let input_stats = self.input.statistics()?;
198222
let schema = self.schema();
199223
if !check_support(predicate, &schema) {
200-
// assume filter selects 20% of rows if we cannot do anything smarter
201-
// tracking issue for making this configurable:
202-
// https://github.com/apache/arrow-datafusion/issues/8133
203-
let selectivity = 0.2_f32;
224+
let selectivity = self.default_selectivity as f32 / 100.0;
204225
let mut stats = input_stats.clone().into_inexact();
205226
if let Precision::Inexact(n) = stats.num_rows {
206227
stats.num_rows = Precision::Inexact((selectivity * n as f32) as usize);
@@ -1009,4 +1030,22 @@ mod tests {
10091030

10101031
Ok(())
10111032
}
1033+
1034+
#[tokio::test]
1035+
async fn test_validation_filter_selectivity() -> Result<()>{
1036+
let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1037+
let input = Arc::new(StatisticsExec::new(
1038+
Statistics::new_unknown(&schema),
1039+
schema,
1040+
));
1041+
// WHERE a = 10
1042+
let predicate = Arc::new(BinaryExpr::new(
1043+
Arc::new(Column::new("a", 0)),
1044+
Operator::Eq,
1045+
Arc::new(Literal::new(ScalarValue::Int32(Some(10)))),
1046+
));
1047+
let filter = FilterExec::try_new(predicate, input)?;
1048+
assert!(filter.with_selectivity(120).is_err());
1049+
Ok(())
1050+
}
10121051
}

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,6 +1360,7 @@ message PhysicalNegativeNode {
13601360
message FilterExecNode {
13611361
PhysicalPlanNode input = 1;
13621362
PhysicalExprNode expr = 2;
1363+
uint32 default_filter_selectivity = 3;
13631364
}
13641365

13651366
message FileGroup {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,17 @@ impl AsExecutionPlan for PhysicalPlanNode {
157157
.to_owned(),
158158
)
159159
})?;
160-
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
160+
let filter_selectivity = filter.default_filter_selectivity.try_into();
161+
let filter = FilterExec::try_new(predicate, input)?;
162+
match filter_selectivity {
163+
Ok(filter_selectivity) => {
164+
Ok(Arc::new(filter.with_selectivity(filter_selectivity)?))
165+
}
166+
Err(_) => Err(DataFusionError::Internal(
167+
"filter_selectivity in PhysicalPlanNode is invalid ".to_owned(),
168+
)),
169+
}
170+
161171
}
162172
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
163173
parse_protobuf_file_scan_config(
@@ -898,6 +908,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
898908
protobuf::FilterExecNode {
899909
input: Some(Box::new(input)),
900910
expr: Some(exec.predicate().clone().try_into()?),
911+
default_filter_selectivity: exec.default_selectivity() as u32,
901912
},
902913
))),
903914
});

docs/source/user-guide/configs.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ Environment variables are read during `SessionConfig` initialisation so they mus
9898
| datafusion.optimizer.max_passes | 3 | Number of times that the optimizer will attempt to optimize the plan |
9999
| datafusion.optimizer.top_down_join_key_reordering | true | When set to true, the physical plan optimizer will run a top down process to reorder the join keys |
100100
| datafusion.optimizer.prefer_hash_join | true | When set to true, the physical plan optimizer will prefer HashJoin over SortMergeJoin. HashJoin can work more efficiently than SortMergeJoin but consumes more memory |
101-
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition |
101+
| datafusion.optimizer.hash_join_single_partition_threshold | 1048576 | The maximum estimated size in bytes for one input side of a HashJoin will be collected into a single partition
102+
| datafusion.optimizer.default_filter_selectivity | 20 | The assumed filter selectivity in from 0 (no rows) to 100 (all rows) used when it is not possibl to determine exactly the number of rows returned by a filter
102103
| datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans |
103104
| datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans |
104105
| datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |

0 commit comments

Comments
 (0)