-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Add dynamic pruning filters from TopK state #15301
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a01e36a
7481311
3fc78e8
f048db9
6889f0c
c8a133e
c4a9568
c15b48f
b4bc34c
14b3005
ff1aaa5
b0cc41f
3a185cc
b664183
67ed488
48910b2
d178dc9
615283c
cf24b10
054b415
ecc89f9
e59aac5
a7ce3bc
5ebba12
d423866
b683507
fbf93a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -36,22 +36,19 @@ use datafusion_catalog::TableProvider; | |
use datafusion_common::{config_err, DataFusionError, Result}; | ||
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; | ||
use datafusion_expr::dml::InsertOp; | ||
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; | ||
use datafusion_expr::{Expr, TableProviderFilterPushDown}; | ||
use datafusion_expr::{SortExpr, TableType}; | ||
use datafusion_physical_plan::empty::EmptyExec; | ||
use datafusion_physical_plan::{ExecutionPlan, Statistics}; | ||
|
||
use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; | ||
use datafusion_common::{ | ||
config_datafusion_err, internal_err, plan_err, project_schema, Constraints, | ||
SchemaExt, ToDFSchema, | ||
config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt, | ||
}; | ||
use datafusion_execution::cache::{ | ||
cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache, | ||
}; | ||
use datafusion_physical_expr::{ | ||
create_physical_expr, LexOrdering, PhysicalSortRequirement, | ||
}; | ||
use datafusion_physical_expr::{LexOrdering, PhysicalSortRequirement}; | ||
|
||
use async_trait::async_trait; | ||
use datafusion_catalog::Session; | ||
|
@@ -918,19 +915,6 @@ impl TableProvider for ListingTable { | |
None => {} // no ordering required | ||
}; | ||
|
||
let filters = match conjunction(filters.to_vec()) { | ||
Some(expr) => { | ||
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; | ||
let filters = create_physical_expr( | ||
&expr, | ||
&table_df_schema, | ||
state.execution_props(), | ||
)?; | ||
Some(filters) | ||
} | ||
None => None, | ||
}; | ||
|
||
let Some(object_store_url) = | ||
self.table_paths.first().map(ListingTableUrl::object_store) | ||
else { | ||
|
@@ -955,7 +939,7 @@ impl TableProvider for ListingTable { | |
.with_output_ordering(output_ordering) | ||
.with_table_partition_cols(table_partition_cols) | ||
.build(), | ||
filters.as_ref(), | ||
None, | ||
Comment on lines
-958
to
+942
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We kind of need to do this otherwise you end up with duplicate filters for the case where There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because essentially this PR is having to introduce a generalized way to do filter pushdown instead of the very specific way that ListingTable does it. And we wouldn't want to do both at the same time. What we want is for ListingTable to tell us:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we can split out the part of this PR that changes how the filters are pushed down (aka pruning predicate per file rather than one overall) as a separate PR to isolate the changes into smaller PRs? |
||
) | ||
.await | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -201,7 +201,7 @@ impl ExecutionPlan for ArrowExec { | |
|
||
/// Arrow configuration struct that is given to DataSourceExec | ||
/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow | ||
#[derive(Clone, Default)] | ||
#[derive(Clone, Default, Debug)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was just an annoyance during debugging. Can revert. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good to me -- we can also make a separate PR for it too (not needed) |
||
pub struct ArrowSource { | ||
metrics: ExecutionPlanMetricsSet, | ||
projected_statistics: Option<Statistics>, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,3 +29,5 @@ mod pruning; | |
mod limit_fuzz; | ||
mod sort_preserving_repartition_fuzz; | ||
mod window_fuzz; | ||
|
||
mod topk_filter_pushdown; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a great example