Skip to content

Add dynamic pruning filters from TopK state #15301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 27 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,13 @@ config_namespace! {
/// during aggregations, if possible
pub enable_topk_aggregation: bool, default = true

/// When set to true attempts to push down dynamic filters generated by operators into the file scan phase.
/// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer
/// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans.
/// This means that if we already have 10 timestamps in the year 2025
/// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a great example

pub enable_dynamic_filter_pushdown: bool, default = true

/// When set to true, the optimizer will insert filters before a join between
/// a nullable and non-nullable column to filter out nulls on the nullable side. This
/// filter can add additional overhead when the file format does not fully support
Expand Down
24 changes: 4 additions & 20 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,19 @@ use datafusion_catalog::TableProvider;
use datafusion_common::{config_err, DataFusionError, Result};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_expr::dml::InsertOp;
use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown};
use datafusion_expr::{Expr, TableProviderFilterPushDown};
use datafusion_expr::{SortExpr, TableType};
use datafusion_physical_plan::empty::EmptyExec;
use datafusion_physical_plan::{ExecutionPlan, Statistics};

use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef};
use datafusion_common::{
config_datafusion_err, internal_err, plan_err, project_schema, Constraints,
SchemaExt, ToDFSchema,
config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt,
};
use datafusion_execution::cache::{
cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache,
};
use datafusion_physical_expr::{
create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
use datafusion_physical_expr::{LexOrdering, PhysicalSortRequirement};

use async_trait::async_trait;
use datafusion_catalog::Session;
Expand Down Expand Up @@ -918,19 +915,6 @@ impl TableProvider for ListingTable {
None => {} // no ordering required
};

let filters = match conjunction(filters.to_vec()) {
Some(expr) => {
let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?;
let filters = create_physical_expr(
&expr,
&table_df_schema,
state.execution_props(),
)?;
Some(filters)
}
None => None,
};

let Some(object_store_url) =
self.table_paths.first().map(ListingTableUrl::object_store)
else {
Expand All @@ -955,7 +939,7 @@ impl TableProvider for ListingTable {
.with_output_ordering(output_ordering)
.with_table_partition_cols(table_partition_cols)
.build(),
filters.as_ref(),
None,
Comment on lines -958 to +942
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We kind of need to do this otherwise you end up with duplicate filters for the case where ListingTable said inexact -> a FilterExec gets created -> we then push down from the filter exec in to the DataSourceExec that already had the filter -> duplicate filter.

Copy link
Contributor Author

@adriangb adriangb Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because essentially this PR is having to introduce a generalized way to do filter pushdown instead of the very specific way that ListingTable does it. And we wouldn't want to do both at the same time. What we want is for ListingTable to tell us:

  • Which filters it can apply just from partitioning (Exact)
  • Any other filter becomes Inexact

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can split out the part of this PR that changes how the filters are pushed down (aka pruning predicate per file rather than one overall) as a separate PR to isolate the changes into smaller PRs?

)
.await
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl ExecutionPlan for ArrowExec {

/// Arrow configuration struct that is given to DataSourceExec
/// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow
#[derive(Clone, Default)]
#[derive(Clone, Default, Debug)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just an annoyance during debugging. Can revert.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me -- we can also make a separate PR for it too (not needed)

pub struct ArrowSource {
metrics: ExecutionPlanMetricsSet,
projected_statistics: Option<Statistics>,
Expand Down
6 changes: 5 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ mod tests {
use chrono::{TimeZone, Utc};
use datafusion_datasource::file_groups::FileGroup;
use futures::StreamExt;
use insta;
use insta::assert_snapshot;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
Expand Down Expand Up @@ -1455,6 +1454,7 @@ mod tests {
.await;

// should have a pruning predicate
#[allow(deprecated)]
let pruning_predicate = rt.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());

Expand Down Expand Up @@ -1496,6 +1496,7 @@ mod tests {
.round_trip(vec![batches.clone()])
.await;

#[allow(deprecated)]
let pruning_predicate = rt0.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());

Expand Down Expand Up @@ -1538,6 +1539,7 @@ mod tests {
.await;

// should have a pruning predicate
#[allow(deprecated)]
let pruning_predicate = rt1.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());
let pruning_predicate = rt2.parquet_source.predicate();
Expand Down Expand Up @@ -1581,6 +1583,7 @@ mod tests {
.await;

// Should not contain a pruning predicate (since nothing can be pruned)
#[allow(deprecated)]
let pruning_predicate = rt.parquet_source.pruning_predicate();
assert!(
pruning_predicate.is_none(),
Expand Down Expand Up @@ -1616,6 +1619,7 @@ mod tests {
.await;

// Should have a pruning predicate
#[allow(deprecated)]
let pruning_predicate = rt.parquet_source.pruning_predicate();
assert!(pruning_predicate.is_some());
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/tests/fuzz_cases/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ mod pruning;
mod limit_fuzz;
mod sort_preserving_repartition_fuzz;
mod window_fuzz;

mod topk_filter_pushdown;
Loading
Loading