Skip to content

Commit 9d16d52

Browse files
authored
Merge branch 'main' into smj-skip-validation
2 parents ec0fb31 + a39c07a commit 9d16d52

File tree

16 files changed

+886
-430
lines changed

16 files changed

+886
-430
lines changed

datafusion/core/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,10 @@ name = "csv_load"
179179
harness = false
180180
name = "distinct_query_sql"
181181

182+
[[bench]]
183+
harness = false
184+
name = "push_down_filter"
185+
182186
[[bench]]
183187
harness = false
184188
name = "sort_limit_query_sql"
+124
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow::array::RecordBatch;
19+
use arrow::datatypes::{DataType, Field, Schema};
20+
use bytes::{BufMut, BytesMut};
21+
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
22+
use datafusion::config::ConfigOptions;
23+
use datafusion::prelude::{ParquetReadOptions, SessionContext};
24+
use datafusion_execution::object_store::ObjectStoreUrl;
25+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
26+
use datafusion_physical_optimizer::PhysicalOptimizerRule;
27+
use datafusion_physical_plan::ExecutionPlan;
28+
use object_store::memory::InMemory;
29+
use object_store::path::Path;
30+
use object_store::ObjectStore;
31+
use parquet::arrow::ArrowWriter;
32+
use std::sync::Arc;
33+
34+
async fn create_plan() -> Arc<dyn ExecutionPlan> {
35+
let ctx = SessionContext::new();
36+
let schema = Arc::new(Schema::new(vec![
37+
Field::new("id", DataType::Int32, true),
38+
Field::new("name", DataType::Utf8, true),
39+
Field::new("age", DataType::UInt16, true),
40+
Field::new("salary", DataType::Float64, true),
41+
]));
42+
let batch = RecordBatch::new_empty(schema);
43+
44+
let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
45+
let mut out = BytesMut::new().writer();
46+
{
47+
let mut writer = ArrowWriter::try_new(&mut out, batch.schema(), None).unwrap();
48+
writer.write(&batch).unwrap();
49+
writer.finish().unwrap();
50+
}
51+
let data = out.into_inner().freeze();
52+
store
53+
.put(&Path::from("test.parquet"), data.into())
54+
.await
55+
.unwrap();
56+
ctx.register_object_store(
57+
ObjectStoreUrl::parse("memory://").unwrap().as_ref(),
58+
store,
59+
);
60+
61+
ctx.register_parquet("t", "memory:///", ParquetReadOptions::default())
62+
.await
63+
.unwrap();
64+
65+
let df = ctx
66+
.sql(
67+
r"
68+
WITH brackets AS (
69+
SELECT age % 10 AS age_bracket
70+
FROM t
71+
GROUP BY age % 10
72+
HAVING COUNT(*) > 10
73+
)
74+
SELECT id, name, age, salary
75+
FROM t
76+
JOIN brackets ON t.age % 10 = brackets.age_bracket
77+
WHERE age > 20 AND t.salary > 1000
78+
ORDER BY t.salary DESC
79+
LIMIT 100
80+
",
81+
)
82+
.await
83+
.unwrap();
84+
85+
df.create_physical_plan().await.unwrap()
86+
}
87+
88+
#[derive(Clone)]
89+
struct BenchmarkPlan {
90+
plan: Arc<dyn ExecutionPlan>,
91+
config: ConfigOptions,
92+
}
93+
94+
impl std::fmt::Display for BenchmarkPlan {
95+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96+
write!(f, "BenchmarkPlan")
97+
}
98+
}
99+
100+
fn bench_push_down_filter(c: &mut Criterion) {
101+
// Create a relatively complex plan
102+
let plan = tokio::runtime::Runtime::new()
103+
.unwrap()
104+
.block_on(create_plan());
105+
let mut config = ConfigOptions::default();
106+
config.execution.parquet.pushdown_filters = true;
107+
let plan = BenchmarkPlan { plan, config };
108+
109+
c.bench_with_input(
110+
BenchmarkId::new("push_down_filter", plan.clone()),
111+
&plan,
112+
|b, plan| {
113+
b.iter(|| {
114+
let optimizer = FilterPushdown::new();
115+
optimizer
116+
.optimize(Arc::clone(&plan.plan), &plan.config)
117+
.unwrap();
118+
});
119+
},
120+
);
121+
}
122+
123+
criterion_group!(benches, bench_push_down_filter);
124+
criterion_main!(benches);

datafusion/core/tests/physical_optimizer/push_down_filter.rs

+32-35
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,10 @@ use datafusion_physical_expr::{
4444
aggregate::AggregateExprBuilder, conjunction, Partitioning,
4545
};
4646
use datafusion_physical_expr_common::physical_expr::fmt_sql;
47-
use datafusion_physical_optimizer::push_down_filter::PushdownFilter;
47+
use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
4848
use datafusion_physical_optimizer::PhysicalOptimizerRule;
4949
use datafusion_physical_plan::filter_pushdown::{
50-
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
51-
FilterPushdownSupport,
50+
FilterPushdownPropagation, PredicateSupports,
5251
};
5352
use datafusion_physical_plan::{
5453
aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
@@ -154,29 +153,24 @@ impl FileSource for TestSource {
154153

155154
fn try_pushdown_filters(
156155
&self,
157-
mut fd: FilterDescription,
156+
mut filters: Vec<Arc<dyn PhysicalExpr>>,
158157
config: &ConfigOptions,
159-
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
158+
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
160159
if self.support && config.execution.parquet.pushdown_filters {
161160
if let Some(internal) = self.predicate.as_ref() {
162-
fd.filters.push(Arc::clone(internal));
161+
filters.push(Arc::clone(internal));
163162
}
164-
let all_filters = fd.take_description();
165-
166-
Ok(FilterPushdownResult {
167-
support: FilterPushdownSupport::Supported {
168-
child_descriptions: vec![],
169-
op: Arc::new(TestSource {
170-
support: true,
171-
predicate: Some(conjunction(all_filters)),
172-
statistics: self.statistics.clone(), // should be updated in reality
173-
}),
174-
revisit: false,
175-
},
176-
remaining_description: FilterDescription::empty(),
163+
let new_node = Arc::new(TestSource {
164+
support: true,
165+
predicate: Some(conjunction(filters.clone())),
166+
statistics: self.statistics.clone(), // should be updated in reality
167+
});
168+
Ok(FilterPushdownPropagation {
169+
filters: PredicateSupports::all_supported(filters),
170+
updated_node: Some(new_node),
177171
})
178172
} else {
179-
Ok(filter_pushdown_not_supported(fd))
173+
Ok(FilterPushdownPropagation::unsupported(filters))
180174
}
181175
}
182176
}
@@ -201,7 +195,7 @@ fn test_pushdown_into_scan() {
201195

202196
// expect the predicate to be pushed down into the DataSource
203197
insta::assert_snapshot!(
204-
OptimizationTest::new(plan, PushdownFilter{}, true),
198+
OptimizationTest::new(plan, FilterPushdown{}, true),
205199
@r"
206200
OptimizationTest:
207201
input:
@@ -225,7 +219,7 @@ fn test_pushdown_into_scan_with_config_options() {
225219
insta::assert_snapshot!(
226220
OptimizationTest::new(
227221
Arc::clone(&plan),
228-
PushdownFilter {},
222+
FilterPushdown {},
229223
false
230224
),
231225
@r"
@@ -244,7 +238,7 @@ fn test_pushdown_into_scan_with_config_options() {
244238
insta::assert_snapshot!(
245239
OptimizationTest::new(
246240
plan,
247-
PushdownFilter {},
241+
FilterPushdown {},
248242
true
249243
),
250244
@r"
@@ -269,7 +263,7 @@ fn test_filter_collapse() {
269263
let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
270264

271265
insta::assert_snapshot!(
272-
OptimizationTest::new(plan, PushdownFilter{}, true),
266+
OptimizationTest::new(plan, FilterPushdown{}, true),
273267
@r"
274268
OptimizationTest:
275269
input:
@@ -278,7 +272,7 @@ fn test_filter_collapse() {
278272
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
279273
output:
280274
Ok:
281-
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
275+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
282276
"
283277
);
284278
}
@@ -288,25 +282,28 @@ fn test_filter_with_projection() {
288282
let scan = test_scan(true);
289283
let projection = vec![1, 0];
290284
let predicate = col_lit_predicate("a", "foo", schema());
291-
let plan = Arc::new(
292-
FilterExec::try_new(predicate, Arc::clone(&scan))
285+
let filter = Arc::new(
286+
FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&scan))
293287
.unwrap()
294288
.with_projection(Some(projection))
295289
.unwrap(),
296290
);
291+
let predicate = col_lit_predicate("b", "bar", &filter.schema());
292+
let plan = Arc::new(FilterExec::try_new(predicate, filter).unwrap());
297293

298294
// expect the predicate to be pushed down into the DataSource but the FilterExec to be converted to ProjectionExec
299295
insta::assert_snapshot!(
300-
OptimizationTest::new(plan, PushdownFilter{}, true),
296+
OptimizationTest::new(plan, FilterPushdown{}, true),
301297
@r"
302298
OptimizationTest:
303299
input:
304-
- FilterExec: a@0 = foo, projection=[b@1, a@0]
305-
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
300+
- FilterExec: b@0 = bar
301+
- FilterExec: a@0 = foo, projection=[b@1, a@0]
302+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true
306303
output:
307304
Ok:
308305
- ProjectionExec: expr=[b@1 as b, a@0 as a]
309-
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo
306+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
310307
",
311308
);
312309

@@ -320,7 +317,7 @@ fn test_filter_with_projection() {
320317
.unwrap(),
321318
);
322319
insta::assert_snapshot!(
323-
OptimizationTest::new(plan, PushdownFilter{},true),
320+
OptimizationTest::new(plan, FilterPushdown{},true),
324321
@r"
325322
OptimizationTest:
326323
input:
@@ -349,7 +346,7 @@ fn test_push_down_through_transparent_nodes() {
349346

350347
// expect the predicate to be pushed down into the DataSource
351348
insta::assert_snapshot!(
352-
OptimizationTest::new(plan, PushdownFilter{},true),
349+
OptimizationTest::new(plan, FilterPushdown{},true),
353350
@r"
354351
OptimizationTest:
355352
input:
@@ -362,7 +359,7 @@ fn test_push_down_through_transparent_nodes() {
362359
Ok:
363360
- RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=0
364361
- CoalesceBatchesExec: target_batch_size=1
365-
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=b@1 = bar AND a@0 = foo
362+
- DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = foo AND b@1 = bar
366363
"
367364
);
368365
}
@@ -413,7 +410,7 @@ fn test_no_pushdown_through_aggregates() {
413410

414411
// expect the predicate to be pushed down into the DataSource
415412
insta::assert_snapshot!(
416-
OptimizationTest::new(plan, PushdownFilter{}, true),
413+
OptimizationTest::new(plan, FilterPushdown{}, true),
417414
@r"
418415
OptimizationTest:
419416
input:

datafusion/datasource/src/file.rs

+7-9
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,8 @@ use crate::file_stream::FileOpener;
2828
use arrow::datatypes::SchemaRef;
2929
use datafusion_common::config::ConfigOptions;
3030
use datafusion_common::{Result, Statistics};
31-
use datafusion_physical_expr::LexOrdering;
32-
use datafusion_physical_plan::filter_pushdown::{
33-
filter_pushdown_not_supported, FilterDescription, FilterPushdownResult,
34-
};
31+
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
32+
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
3533
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
3634
use datafusion_physical_plan::DisplayFormatType;
3735

@@ -108,14 +106,14 @@ pub trait FileSource: Send + Sync {
108106
}
109107

110108
/// Try to push down filters into this FileSource.
111-
/// See [`ExecutionPlan::try_pushdown_filters`] for more details.
109+
/// See [`ExecutionPlan::handle_child_pushdown_result`] for more details.
112110
///
113-
/// [`ExecutionPlan::try_pushdown_filters`]: datafusion_physical_plan::ExecutionPlan::try_pushdown_filters
111+
/// [`ExecutionPlan::handle_child_pushdown_result`]: datafusion_physical_plan::ExecutionPlan::handle_child_pushdown_result
114112
fn try_pushdown_filters(
115113
&self,
116-
fd: FilterDescription,
114+
filters: Vec<Arc<dyn PhysicalExpr>>,
117115
_config: &ConfigOptions,
118-
) -> Result<FilterPushdownResult<Arc<dyn FileSource>>> {
119-
Ok(filter_pushdown_not_supported(fd))
116+
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
117+
Ok(FilterPushdownPropagation::unsupported(filters))
120118
}
121119
}

0 commit comments

Comments
 (0)