Skip to content

Commit 04e3e82

Browse files
committed
Turning filter selectivity as a configurable parameter
1 parent 4fb4b21 commit 04e3e82

File tree

9 files changed

+62
-7
lines changed

9 files changed

+62
-7
lines changed

datafusion/common/src/config.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,10 @@ config_namespace! {
524524
/// The maximum estimated size in bytes for one input side of a HashJoin
525525
/// will be collected into a single partition
526526
pub hash_join_single_partition_threshold: usize, default = 1024 * 1024
527+
528+
/// The default filter selectivity used by Filter statistics
529+
/// when an exact selectivity cannot be determined
530+
pub default_filter_selectivity: u8, default = 20
527531
}
528532
}
529533

@@ -877,6 +881,7 @@ config_field!(String);
877881
config_field!(bool);
878882
config_field!(usize);
879883
config_field!(f64);
884+
config_field!(u8);
880885
config_field!(u64);
881886

882887
/// An implementation trait used to recursively walk configuration

datafusion/core/src/physical_optimizer/projection_pushdown.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,8 +302,12 @@ fn try_swapping_with_filter(
302302
return Ok(None);
303303
};
304304

305-
FilterExec::try_new(new_predicate, make_with_child(projection, filter.input())?)
306-
.map(|e| Some(Arc::new(e) as _))
305+
FilterExec::try_new(
306+
new_predicate,
307+
make_with_child(projection, filter.input())?,
308+
filter.default_selectivity(),
309+
)
310+
.map(|e| Some(Arc::new(e) as _))
307311
}
308312

309313
/// Tries to swap the projection with its input [`RepartitionExec`]. If it can be done,

datafusion/core/src/physical_planner.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -913,7 +913,8 @@ impl DefaultPhysicalPlanner {
913913
&input_schema,
914914
session_state,
915915
)?;
916-
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?))
916+
let selectivity = session_state.config().options().optimizer.default_filter_selectivity;
917+
Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input, selectivity)?))
917918
}
918919
LogicalPlan::Union(Union { inputs, schema }) => {
919920
let physical_plans = self.create_initial_plan_multi(inputs.iter().map(|lp| lp.as_ref()), session_state).await?;

datafusion/core/src/test_util/parquet.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ impl TestParquetFile {
178178
None,
179179
));
180180

181-
let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
181+
let exec =
182+
Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec, 20)?);
182183
Ok(exec)
183184
} else {
184185
Ok(Arc::new(ParquetExec::new(scan_config, None, None)))

datafusion/physical-plan/src/filter.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,26 @@ pub struct FilterExec {
6262
input: Arc<dyn ExecutionPlan>,
6363
/// Execution metrics
6464
metrics: ExecutionPlanMetricsSet,
65+
/// Selectivity for statistics. 0 = no rows, 100 all rows
66+
default_selectivity: u8,
6567
}
6668

6769
impl FilterExec {
6870
/// Create a FilterExec on an input
6971
pub fn try_new(
7072
predicate: Arc<dyn PhysicalExpr>,
7173
input: Arc<dyn ExecutionPlan>,
74+
default_selectivity: u8,
7275
) -> Result<Self> {
76+
if default_selectivity > 100 {
77+
return plan_err!("Default flter selectivity needs to be less than 100");
78+
}
7379
match predicate.data_type(input.schema().as_ref())? {
7480
DataType::Boolean => Ok(Self {
7581
predicate,
7682
input: input.clone(),
7783
metrics: ExecutionPlanMetricsSet::new(),
84+
default_selectivity,
7885
}),
7986
other => {
8087
plan_err!("Filter predicate must return boolean values, not {other:?}")
@@ -91,6 +98,11 @@ impl FilterExec {
9198
pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
9299
&self.input
93100
}
101+
102+
/// The default selectivity
103+
pub fn default_selectivity(&self) -> u8 {
104+
self.default_selectivity
105+
}
94106
}
95107

96108
impl DisplayAs for FilterExec {
@@ -166,8 +178,12 @@ impl ExecutionPlan for FilterExec {
166178
self: Arc<Self>,
167179
mut children: Vec<Arc<dyn ExecutionPlan>>,
168180
) -> Result<Arc<dyn ExecutionPlan>> {
169-
FilterExec::try_new(self.predicate.clone(), children.swap_remove(0))
170-
.map(|e| Arc::new(e) as _)
181+
FilterExec::try_new(
182+
self.predicate.clone(),
183+
children.swap_remove(0),
184+
self.default_selectivity,
185+
)
186+
.map(|e| Arc::new(e) as _)
171187
}
172188

173189
fn execute(

datafusion/proto/proto/datafusion.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1360,6 +1360,7 @@ message PhysicalNegativeNode {
13601360
message FilterExecNode {
13611361
PhysicalPlanNode input = 1;
13621362
PhysicalExprNode expr = 2;
1363+
uint32 default_filter_selectivity = 3;
13631364
}
13641365

13651366
message FileGroup {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/physical_plan/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,11 @@ impl AsExecutionPlan for PhysicalPlanNode {
157157
.to_owned(),
158158
)
159159
})?;
160-
Ok(Arc::new(FilterExec::try_new(predicate, input)?))
160+
Ok(Arc::new(FilterExec::try_new(
161+
predicate,
162+
input,
163+
filter.default_filter_selectivity as u8,
164+
)?))
161165
}
162166
PhysicalPlanType::CsvScan(scan) => Ok(Arc::new(CsvExec::new(
163167
parse_protobuf_file_scan_config(
@@ -898,6 +902,7 @@ impl AsExecutionPlan for PhysicalPlanNode {
898902
protobuf::FilterExecNode {
899903
input: Some(Box::new(input)),
900904
expr: Some(exec.predicate().clone().try_into()?),
905+
default_filter_selectivity: exec.default_selectivity() as u32,
901906
},
902907
))),
903908
});

0 commit comments

Comments
 (0)