-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Return TableProviderFilterPushDown::Exact when Parquet Pushdown Enabled #12135
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @itsjunetime -- this is very nice
I left some ideas for how to potentially improve this PR
Note for other reviewers: I found https://github.com/apache/datafusion/pull/12135/files?w=1 to be easier to understand
Testing
In order to merge this PR I think we need an end to end test that shows:
- An
explain
query against a parquet file (`EXPLAIN SELECT ... FROM foo.parquet WHERE col = '5') showing that there is indeed a FilterExec above the parquet - Show that when pushdown filters is enabled (e.g. `set datafusion.execution.parquet.pushdown_filters=true) there is no more FilterExec
I would suggest adding to to this slt file:
https://github.com/apache/datafusion/blob/e4be013064943786c9915bbc79c18ee82106340a/datafusion/sqllogictest/test_files/parquet.slt
Here are the instructions for sqllogictest: https://github.com/apache/datafusion/tree/main/datafusion/sqllogictest
It would also be really great if we could make a PR that added coverage as described in #12115, but I don't think that is strictly required
Avoid #cfg(..)
is possib,e
I also think we should consider avodiing the #[cfg(...)]
checks by extending the FileFormat
trait (I commented inline), but we could also do that as a follow on PR
Suggestion: Split into 2 PRs
If you pull the cleanups (e.g. improving use of flags, macro->function, etc) into their own PR, we could get them reviewed and merged faster. I realize it takes more effort on the author's (your) part to create multiple PRs, but it optimizes for what is often more limited (reviewer bandwidth)
Discussion about expression pushdown
I'm worried about this a little bit because I was under the impression that there were more complex requirements which defined whether or not an expression could be pushed down (specifically with regard to more complex expressions - e.g. AND/OR exprs have more complex rules that single-column exprs don't have to worry about)
DataFusion already handles the rules for when can a predicate be pushed down between various types of plan nodes (e.g. with grouping, joins, multiple tables, etc) in the PushDownFilter
pass
By the time it gets to a TableProvider
the predicates have already been broken up
I double checked, and ArrowPredicate
's take arbitrary expressions, internally they can evaluate complex expressions such as AND/OR
cc @XiangpengHao, @Dandandan, @thinkharderdev and @tustvold who I think may interested in this feature as well
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Outdated
Show resolved
Hide resolved
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Outdated
Show resolved
Hide resolved
datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
Outdated
Show resolved
Hide resolved
Drive-by fixes have been extracted into their own PR, #12240. Still working on the other recommendations. |
Rebased to make it mergeable |
e65f04d
to
e177b0c
Compare
Another rebasing to clean out the drive-by cleanups after merging of the other PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @itsjunetime -- this PR looks really nice, both in structure and coments.
I think it is important to write some end to end tests (e.g. slt
tests) that show explain plans with these filters enabled / disabled correctly -- and I did so in #12362
It turns out that when I ran the tests from this PR on the tests in #12362 the tests don't change. I am taking a look to see if I can figure out why that is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It turns out that when I ran the tests from this PR on the tests in #12362 the tests don't change. I am taking a look to see if I can figure out why that is.
I looked into this some more and I found it out is because the settings (enable_pushdown) is inherited when the table is created -- so just setting it in the session isn't enough -- it needs to be set when the table is created
I will adjust my tests accordingly. Thus I think this PR is ready to go from my perspective (we can add the tests as a follow on)
Thanks again @itsjunetime -- very nice work. I took the liberty of merging up from main to run all the latests tests and ensure CI is passing. Once the CI is passing I plan to merge this PR. Thanks again 🙏 |
🤔 sorry to flip flop on this, but I wrote some more tests in #12362 and when I run them on this branch I see an internal error To reproduce:
This results in a query with in an internal error like this (when the file didn't have a column) # When filter pushdown *is* enabled, ParquetExec can filter exactly,
# not just metadata, so we expect to see no FilterExec
# once https://github.com/apache/datafusion/issues/4028 is fixed
-query T
+query error DataFusion error: Arrow error: External error: External: Compute error: Error evaluating filter predicate: Internal\("PhysicalExpr Column references column 'b' at index 0 \(zer
o\-based\) but input schema only has 0 columns: \[\]"\)
select a from t_pushdown where b > 2 ORDER BY a; |
Sorry that I forgot to post this, but the tests have been since merged to main, so you should be able to pick them up by merging this branch up from main, if you haven't already done so |
Marking as draft as I think this PR is no longer waiting on feedback and I am trying to make it clearer which PRs are waiting for feedback and which are not. Please mark it as ready for review when it is ready for another look |
Sounds good, thanks - I'm checking out the failing tests now and will also handle rebasing once I'm done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am just writing up some notes I have from a discussion I just had with @itsjunetime (mostly so I don't forget this)
We traced the issue to the fact that after returning TableProviderPushdown::Exact there is now a difference between the columns used by the rest of the query (columns above the filter) and the columns needed for evaluating the filters
The SchamaAdapter is now removing columns necessary for evaluating the filter from the columns passed to the filter which is causing the error
The code that is removing the the columns is the map_partial_batch
added in @jeffreyssmith2nd #10716 (that also applies any type casting necessary)
As I understand it, @itsjunetime is going to look into how to modify / replace map_partial_batch
so that it does not remove columns even when they aren't necessary for the rest of the query
I also plan to improve some of the documentation based on our debugging today
… actually does (the opposite) and add back useful comments
…h support pushdown
…itate unnecessary FilterExec removal
15ca650
to
ef0affe
Compare
Alright, I've pushed changes to pipe through the information about the schema to the necessary places (specifically, the SchemaMapper) so that the tests added by @alamb don't error anymore. I also added a lot of documentation in the latest commit to explain what I now understand and how I came to understand it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @itsjunetime -- this now looks really nice to me. Thank you for sticking with this
I have a few minor comment suggestions but I think we could do that as a follow on PR as well
I noticed there is a CI failure https://github.com/apache/datafusion/actions/runs/10856219950/job/30130449796?pr=12135
This is likely due to using some feature that is not available in Rust 1.77 (the current MSRV). You can probably fix it by using some less nice syntax. However, it might also just start passing once @comphead increases the minimum supported rust version to 1.78 in #12398
cc @Dandandan and @thinkharderdev as I think you are also users of this filter pushdown code. This should make the plans more efficient by avoiding redundant FilterExec
s (and potentially also producing fewer columns)
04)------FilterExec: b@1 > 2, projection=[a@0] | ||
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=2 | ||
06)----------ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a, b], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[] | ||
03)----ParquetExec: file_groups={2 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/1.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet_filter_pushdown/parquet_table/2.parquet]]}, projection=[a], predicate=b@1 > 2, pruning_predicate=CASE WHEN b_null_count@1 = b_row_count@2 THEN false ELSE b_max@0 > 2 END, required_guarantees=[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I plan to merge this tomorrow to give people time to comment / respond if desired Thanks again |
url.object_store() | ||
} else { | ||
let Some(object_store_url) = | ||
self.table_paths.first().map(ListingTableUrl::object_store) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
- Remove expect(_) attr to satisfy msrv - Update comments with more accurate details and explanations
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
…able_schema` Co-authored-by: Andrew Lamb <[email protected]>
🚀 |
Which issue does this PR close?
I think this should close #4028
Rationale for this change
As far as I can tell, this follows the recommendations stated in #4028. The only part that I'm confused about is specifically the last requirement for this API: "The predicate is fully pushed down by ParquetExec (not all predicates are supported)".
To fulfill this requirement, this PR pulls in code that used to reside in
FilterCandidateBuilder
that recurses through an expr and checks if any column:If any columns in the given expression fulfill either of these two requirements, we assume that it can't be pushed down, and return
Inexact
for this API (as well as if we're not using parquet or if pushdown is not enabled, of course).I'm worried about this a little bit because I was under the impression that there were more complex requirements which defined whether or not an expression could be pushed down (specifically with regard to more complex expressions - e.g.
AND
/OR
exprs have more complex rules that single-column exprs don't have to worry about) but those rules don't seem to be reflected in the code that I could find, so I might be being a bit paranoid. I'm not exactly certain and would appreciate some feedback on that specifically :)What changes are included in this PR?
Code is mostly just moved around to facilitate the changes described above.
Are these changes tested?
Yes - I've added a few tests to make sure all the changes I'm aware of are now tested.
Are there any user-facing changes?
No API changes, but I think the affected ListingTable API is public-facing, so this will be a slight behavioral change.