diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index b0f17630c910..6c0a329f6bf3 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -590,6 +590,13 @@ config_namespace! { /// during aggregations, if possible pub enable_topk_aggregation: bool, default = true + /// When set to true attempts to push down dynamic filters generated by operators into the file scan phase. + /// For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer + /// will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. + /// This means that if we already have 10 timestamps in the year 2025 + /// any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. + pub enable_dynamic_filter_pushdown: bool, default = true + /// When set to true, the optimizer will insert filters before a join between /// a nullable and non-nullable column to filter out nulls on the nullable side. This /// filter can add additional overhead when the file format does not fully support diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 61eeb419a480..5595931c1006 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -36,22 +36,19 @@ use datafusion_catalog::TableProvider; use datafusion_common::{config_err, DataFusionError, Result}; use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder}; use datafusion_expr::dml::InsertOp; -use datafusion_expr::{utils::conjunction, Expr, TableProviderFilterPushDown}; +use datafusion_expr::{Expr, TableProviderFilterPushDown}; use datafusion_expr::{SortExpr, TableType}; use datafusion_physical_plan::empty::EmptyExec; use datafusion_physical_plan::{ExecutionPlan, Statistics}; use arrow::datatypes::{DataType, Field, Schema, SchemaBuilder, SchemaRef}; use datafusion_common::{ - config_datafusion_err, internal_err, plan_err, project_schema, Constraints, - SchemaExt, ToDFSchema, + config_datafusion_err, internal_err, plan_err, project_schema, Constraints, SchemaExt, }; use datafusion_execution::cache::{ cache_manager::FileStatisticsCache, cache_unit::DefaultFileStatisticsCache, }; -use datafusion_physical_expr::{ - create_physical_expr, LexOrdering, PhysicalSortRequirement, -}; +use datafusion_physical_expr::{LexOrdering, PhysicalSortRequirement}; use async_trait::async_trait; use datafusion_catalog::Session; @@ -918,19 +915,6 @@ impl TableProvider for ListingTable { None => {} // no ordering required }; - let filters = match conjunction(filters.to_vec()) { - Some(expr) => { - let table_df_schema = self.table_schema.as_ref().clone().to_dfschema()?; - let filters = create_physical_expr( - &expr, - &table_df_schema, - state.execution_props(), - )?; - Some(filters) - } - None => None, - }; - let Some(object_store_url) = self.table_paths.first().map(ListingTableUrl::object_store) else { @@ -955,7 +939,7 @@ impl TableProvider for ListingTable { .with_output_ordering(output_ordering) .with_table_partition_cols(table_partition_cols) .build(), - filters.as_ref(), + None, ) .await } diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs index 5dcf4df73f57..692fc1df28bb 100644 --- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs +++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs @@ -201,7 +201,7 @@ impl ExecutionPlan for ArrowExec { /// Arrow configuration struct that is given to DataSourceExec /// Does not hold anything special, since [`FileScanConfig`] is sufficient for arrow -#[derive(Clone, Default)] +#[derive(Clone, Default, Debug)] pub struct ArrowSource { metrics: ExecutionPlanMetricsSet, projected_statistics: Option, diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs b/datafusion/core/src/datasource/physical_plan/parquet.rs index 9e1b2822e854..74cbf164720e 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet.rs @@ -68,7 +68,6 @@ mod tests { use chrono::{TimeZone, Utc}; use datafusion_datasource::file_groups::FileGroup; use futures::StreamExt; - use insta; use insta::assert_snapshot; use object_store::local::LocalFileSystem; use object_store::path::Path; @@ -1455,6 +1454,7 @@ mod tests { .await; // should have a pruning predicate + #[allow(deprecated)] let pruning_predicate = rt.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); @@ -1496,6 +1496,7 @@ mod tests { .round_trip(vec![batches.clone()]) .await; + #[allow(deprecated)] let pruning_predicate = rt0.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); @@ -1538,6 +1539,7 @@ mod tests { .await; // should have a pruning predicate + #[allow(deprecated)] let pruning_predicate = rt1.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); let pruning_predicate = rt2.parquet_source.predicate(); @@ -1581,6 +1583,7 @@ mod tests { .await; // Should not contain a pruning predicate (since nothing can be pruned) + #[allow(deprecated)] let pruning_predicate = rt.parquet_source.pruning_predicate(); assert!( pruning_predicate.is_none(), @@ -1616,6 +1619,7 @@ mod tests { .await; // Should have a pruning predicate + #[allow(deprecated)] let pruning_predicate = rt.parquet_source.pruning_predicate(); assert!(pruning_predicate.is_some()); } diff --git a/datafusion/core/tests/fuzz_cases/mod.rs b/datafusion/core/tests/fuzz_cases/mod.rs index d5511e2970f4..11bf29431e90 100644 --- a/datafusion/core/tests/fuzz_cases/mod.rs +++ b/datafusion/core/tests/fuzz_cases/mod.rs @@ -29,3 +29,5 @@ mod pruning; mod limit_fuzz; mod sort_preserving_repartition_fuzz; mod window_fuzz; + +mod topk_filter_pushdown; diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs new file mode 100644 index 000000000000..2b1b905d9390 --- /dev/null +++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs @@ -0,0 +1,354 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; + +use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder}; +use arrow::datatypes::Int32Type; +use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, Field, Schema}; +use datafusion::datasource::listing::{ListingOptions, ListingTable, ListingTableConfig}; +use datafusion::prelude::{SessionConfig, SessionContext}; +use datafusion_datasource::ListingTableUrl; +use datafusion_datasource_parquet::ParquetFormat; +use datafusion_execution::object_store::ObjectStoreUrl; +use itertools::Itertools; +use object_store::memory::InMemory; +use object_store::path::Path; +use object_store::{ObjectStore, PutPayload}; +use parquet::arrow::ArrowWriter; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use tokio::sync::Mutex; +use tokio::task::JoinSet; + +#[derive(Clone)] +struct TestDataSet { + store: Arc, + schema: Arc, +} + +/// List of in memory parquet files with UTF8 data +// Use a mutex rather than LazyLock to allow for async initialization +static TESTFILES: LazyLock>> = + LazyLock::new(|| Mutex::new(vec![])); + +async fn test_files() -> Vec { + let files_mutex = &TESTFILES; + let mut files = files_mutex.lock().await; + if !files.is_empty() { + return (*files).clone(); + } + + let mut rng = StdRng::seed_from_u64(0); + + for nulls_in_ids in [false, true] { + for nulls_in_names in [false, true] { + for nulls_in_departments in [false, true] { + let store = Arc::new(InMemory::new()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, nulls_in_ids), + Field::new("name", DataType::Utf8, nulls_in_names), + Field::new( + "department", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Utf8), + ), + nulls_in_departments, + ), + ])); + + let name_choices = if nulls_in_names { + [Some("Alice"), Some("Bob"), None, Some("David"), None] + } else { + [ + Some("Alice"), + Some("Bob"), + Some("Charlie"), + Some("David"), + Some("Eve"), + ] + }; + + let department_choices = if nulls_in_departments { + [ + Some("Theater"), + Some("Engineering"), + None, + Some("Arts"), + None, + ] + } else { + [ + Some("Theater"), + Some("Engineering"), + Some("Healthcare"), + Some("Arts"), + Some("Music"), + ] + }; + + // Generate 5 files, some with overlapping or repeated ids some without + for i in 0..5 { + let num_batches = rng.gen_range(1..3); + let mut batches = Vec::with_capacity(num_batches); + for _ in 0..num_batches { + let num_rows = 25; + let ids = Int32Array::from_iter((0..num_rows).map(|file| { + if nulls_in_ids { + if rng.gen_bool(1.0 / 10.0) { + None + } else { + Some(rng.gen_range(file..file + 5)) + } + } else { + Some(rng.gen_range(file..file + 5)) + } + })); + let names = StringArray::from_iter((0..num_rows).map(|_| { + // randomly select a name + let idx = rng.gen_range(0..name_choices.len()); + name_choices[idx].map(|s| s.to_string()) + })); + let mut departments = StringDictionaryBuilder::::new(); + for _ in 0..num_rows { + // randomly select a department + let idx = rng.gen_range(0..department_choices.len()); + departments.append_option(department_choices[idx].as_ref()); + } + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(ids), + Arc::new(names), + Arc::new(departments.finish()), + ], + ) + .unwrap(); + batches.push(batch); + } + let mut buf = vec![]; + { + let mut writer = + ArrowWriter::try_new(&mut buf, schema.clone(), None).unwrap(); + for batch in batches { + writer.write(&batch).unwrap(); + writer.flush().unwrap(); + } + writer.flush().unwrap(); + writer.finish().unwrap(); + } + let payload = PutPayload::from(buf); + let path = Path::from(format!("file_{i}.parquet")); + store.put(&path, payload).await.unwrap(); + } + files.push(TestDataSet { store, schema }); + } + } + } + (*files).clone() +} + +async fn run_query_with_config( + query: &str, + config: SessionConfig, + dataset: TestDataSet, +) -> Vec { + let store = dataset.store; + let schema = dataset.schema; + let ctx = SessionContext::new_with_config(config); + let url = ObjectStoreUrl::parse("memory://").unwrap(); + ctx.register_object_store(url.as_ref(), store.clone()); + + let format = Arc::new( + ParquetFormat::default() + .with_options(ctx.state().table_options().parquet.clone()), + ); + let options = ListingOptions::new(format); + let table_path = ListingTableUrl::parse("memory:///").unwrap(); + let config = ListingTableConfig::new(table_path) + .with_listing_options(options) + .with_schema(schema); + let table = Arc::new(ListingTable::try_new(config).unwrap()); + + ctx.register_table("test_table", table).unwrap(); + + ctx.sql(query).await.unwrap().collect().await.unwrap() +} + +#[derive(Debug)] +struct RunQueryResult { + query: String, + result: Vec, + expected: Vec, +} + +impl RunQueryResult { + fn expected_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.expected).unwrap()) + } + + fn result_formated(&self) -> String { + format!("{}", pretty_format_batches(&self.result).unwrap()) + } + + fn is_ok(&self) -> bool { + self.expected_formated() == self.result_formated() + } +} + +async fn run_query( + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +) -> RunQueryResult { + let cfg_with_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true); + let cfg_without_dynamic_filters = cfg + .clone() + .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", false); + + let expected_result = + run_query_with_config(&query, cfg_without_dynamic_filters, dataset.clone()).await; + let result = + run_query_with_config(&query, cfg_with_dynamic_filters, dataset.clone()).await; + + RunQueryResult { + query: query.to_string(), + result, + expected: expected_result, + } +} + +struct TestCase { + query: String, + cfg: SessionConfig, + dataset: TestDataSet, +} + +#[tokio::test(flavor = "multi_thread")] +async fn test_fuzz_topk_filter_pushdown() { + let order_columns = ["id", "name", "department"]; + let order_directions = ["ASC", "DESC"]; + let null_orders = ["NULLS FIRST", "NULLS LAST"]; + + let start = datafusion_common::instant::Instant::now(); + let mut orders: HashMap> = HashMap::new(); + for order_column in &order_columns { + for order_direction in &order_directions { + for null_order in &null_orders { + // if there is a vec for this column insert the order, otherwise create a new vec + let ordering = + format!("{} {} {}", order_column, order_direction, null_order); + match orders.get_mut(*order_column) { + Some(order_vec) => { + order_vec.push(ordering); + } + None => { + orders.insert(order_column.to_string(), vec![ordering]); + } + } + } + } + } + + let mut queries = vec![]; + + for limit in [1, 10] { + for num_order_by_columns in [1, 2, 3] { + for order_columns in ["id", "name", "department"] + .iter() + .combinations(num_order_by_columns) + { + for orderings in order_columns + .iter() + .map(|col| orders.get(**col).unwrap()) + .multi_cartesian_product() + { + let query = format!( + "SELECT * FROM test_table ORDER BY {} LIMIT {}", + orderings.into_iter().join(", "), + limit + ); + queries.push(query); + } + } + } + } + + queries.sort_unstable(); + println!( + "Generated {} queries in {:?}", + queries.len(), + start.elapsed() + ); + + let start = datafusion_common::instant::Instant::now(); + let datasets = test_files().await; + println!("Generated test files in {:?}", start.elapsed()); + + let mut test_cases = vec![]; + for enable_filter_pushdown in [true, false] { + for query in &queries { + for dataset in &datasets { + let mut cfg = SessionConfig::new(); + cfg = cfg.set_bool( + "datafusion.optimizer.enable_dynamic_filter_pushdown", + enable_filter_pushdown, + ); + test_cases.push(TestCase { + query: query.to_string(), + cfg, + dataset: dataset.clone(), + }); + } + } + } + + let start = datafusion_common::instant::Instant::now(); + let mut join_set = JoinSet::new(); + for tc in test_cases { + join_set.spawn(run_query(tc.query, tc.cfg, tc.dataset)); + } + let mut results = join_set.join_all().await; + results.sort_unstable_by(|a, b| a.query.cmp(&b.query)); + println!("Ran {} test cases in {:?}", results.len(), start.elapsed()); + + let failures = results + .iter() + .filter(|result| !result.is_ok()) + .collect::>(); + + for failure in &failures { + println!("Failure:"); + println!("Query:\n{}", failure.query); + println!("\nExpected:\n{}", failure.expected_formated()); + println!("\nResult:\n{}", failure.result_formated()); + println!("\n\n"); + } + + if !failures.is_empty() { + panic!("Some test cases failed"); + } else { + println!("All test cases passed"); + } +} diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 7e98ebed6c9a..58a9148029bf 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -28,6 +28,7 @@ use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::prelude::SessionContext; use datafusion_common::stats::Precision; +use datafusion_common::DFSchema; use datafusion_execution::cache::cache_manager::CacheManagerConfig; use datafusion_execution::cache::cache_unit::{ DefaultFileStatisticsCache, DefaultListFilesCache, @@ -37,6 +38,10 @@ use datafusion_execution::runtime_env::RuntimeEnvBuilder; use datafusion_expr::{col, lit, Expr}; use datafusion::datasource::physical_plan::FileScanConfig; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::ExecutionPlan; use tempfile::tempdir; #[tokio::test] @@ -48,15 +53,38 @@ async fn check_stats_precision_with_filter_pushdown() { let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); let table = get_listing_table(&table_path, None, &opt).await; let (_, _, state) = get_cache_runtime_state(); + + let filter = Expr::gt(col("id"), lit(1)); + // Scan without filter, stats are exact let exec = table.scan(&state, None, &[], None).await.unwrap(); assert_eq!(exec.statistics().unwrap().num_rows, Precision::Exact(8)); - // Scan with filter pushdown, stats are inexact - let filter = Expr::gt(col("id"), lit(1)); - - let exec = table.scan(&state, None, &[filter], None).await.unwrap(); - assert_eq!(exec.statistics().unwrap().num_rows, Precision::Inexact(8)); + // Apply filter pushdown, this should make the estimate inexact because we don't know + // how many rows will be filtered out by the predicate. + let df_schema = DFSchema::try_from(table.schema()).unwrap(); + let exec = FilterExec::try_new( + state + .create_physical_expr(filter.clone(), &df_schema) + .unwrap(), + exec, + ) + .unwrap(); + let exec = PushdownFilter::new() + .optimize(Arc::new(exec), state.config().options()) + .unwrap(); + let data_source_exec = exec + .as_any() + .downcast_ref::() + .unwrap() + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!( + data_source_exec.statistics().unwrap().num_rows, + Precision::Inexact(8) + ); } #[tokio::test] diff --git a/datafusion/core/tests/parquet/filter_pushdown.rs b/datafusion/core/tests/parquet/filter_pushdown.rs index 02fb59740493..5108548fcae1 100644 --- a/datafusion/core/tests/parquet/filter_pushdown.rs +++ b/datafusion/core/tests/parquet/filter_pushdown.rs @@ -26,18 +26,31 @@ //! select * from data limit 10; //! ``` +use std::fs::{self, File}; use std::path::Path; +use std::sync::Arc; +use arrow::array::{Int64Array, StringArray, StructArray}; use arrow::compute::concat_batches; use arrow::record_batch::RecordBatch; +use arrow::util::pretty::pretty_format_batches; +use arrow_schema::{DataType, Field, Fields, Schema}; +use datafusion::assert_batches_eq; +use datafusion::config::TableParquetOptions; +use datafusion::datasource::listing::ListingOptions; use datafusion::physical_plan::collect; use datafusion::physical_plan::metrics::MetricsSet; -use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext}; +use datafusion::prelude::{ + col, lit, lit_timestamp_nano, Expr, SessionConfig, SessionContext, +}; use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile}; use datafusion_common::instant::Instant; +use datafusion_common::{assert_contains, Result}; +use datafusion_datasource_parquet::ParquetFormat; use datafusion_expr::utils::{conjunction, disjunction, split_conjunction}; use itertools::Itertools; +use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; use tempfile::TempDir; use test_utils::AccessLogGenerator; @@ -597,3 +610,364 @@ fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { } } } + +struct DynamicFilterTestCase { + query: String, + path: String, +} + +impl DynamicFilterTestCase { + fn new(query: String, path: String) -> Self { + Self { query, path } + } + + async fn run_query(&self, query: &str) -> Vec { + // Force 1 partition and 1 rg per partition because if we widen the plan + // and read all batches at once we won't get any dynamic pushdown. + let mut cfg = SessionConfig::new(); + cfg = cfg.set_u64("datafusion.execution.parquet.max_row_group_size", 1); + let ctx = SessionContext::new_with_config(cfg); + + let mut pq_options = TableParquetOptions::default(); + pq_options.global.max_row_group_size = 1; + pq_options.global.pushdown_filters = true; + let fmt = ParquetFormat::default().with_options(pq_options); + let opt = ListingOptions::new(Arc::new(fmt)).with_target_partitions(1); + ctx.register_listing_table("base_table", &self.path, opt, None, None) + .await + .unwrap(); + + ctx.sql(query).await.unwrap().collect().await.unwrap() + } + + async fn results(&self) -> Vec { + self.run_query(&self.query).await + } + + async fn explain_plan(&self) -> String { + let query = format!("EXPLAIN ANALYZE {}", self.query); + let batches = self.run_query(&query).await; + + pretty_format_batches(&batches) + .map(|s| format!("{}", s)) + .unwrap_or_else(|_| "No explain plan generated".to_string()) + } +} + +fn write_file_with_non_null_ids(file: &String, value: i64) { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, false), + ]); + let id_array = Int64Array::from(vec![Some(value)]); + let name_array = StringArray::from(vec![Some("test")]); + let schema = Arc::new(schema); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_array), Arc::new(name_array)], + ) + .unwrap(); + write_record_batch(file, batch).unwrap(); +} + +fn write_file_with_null_ids(file: &String) { + let schema = Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, false), + ]); + let id_array = Int64Array::from(vec![None]); + let name_array = StringArray::from(vec![Some(format!("test{:02}", "null"))]); + let schema = Arc::new(schema); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(id_array), Arc::new(name_array)], + ) + .unwrap(); + write_record_batch(file, batch).unwrap(); +} + +fn write_record_batch(file: &String, batch: RecordBatch) -> Result<()> { + let file = File::create(file)?; + let w_opt = WriterProperties::builder() + .set_max_row_group_size(1) + .build(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(w_opt))?; + writer.write(&batch)?; + writer.flush()?; + writer.close()?; + Ok(()) +} + +fn write_file(file: &String) { + let struct_fields = Fields::from(vec![ + Field::new("id", DataType::Int64, false), + Field::new("name", DataType::Utf8, false), + ]); + let schema = Schema::new(vec![ + Field::new("struct", DataType::Struct(struct_fields.clone()), false), + Field::new("id", DataType::Int64, true), + Field::new("name", DataType::Utf8, false), + ]); + let id_array = Int64Array::from(vec![Some(2), Some(1)]); + let columns = vec![ + Arc::new(Int64Array::from(vec![3, 4])) as _, + Arc::new(StringArray::from(vec!["zzz", "aaa"])) as _, + ]; + let struct_array = StructArray::new(struct_fields, columns, None); + + let name_array = StringArray::from(vec![Some("test02"), Some("test01")]); + let schema = Arc::new(schema); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(struct_array), + Arc::new(id_array), + Arc::new(name_array), + ], + ) + .unwrap(); + write_record_batch(file, batch).unwrap(); +} + +#[tokio::test] +async fn test_topk_predicate_pushdown() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + for file in 0..5 { + // write 2 files so that one is processed before the other + let name = format!("test{:02}.parquet", file); + write_file(&format!("{path}/{name}")); + } + + let query = "select name from base_table order by id desc limit 3"; + + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+--------+", + "| name |", + "+--------+", + "| test02 |", + "| test02 |", + "| test02 |", + "+--------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=2"); +} + +#[tokio::test] +async fn test_topk_predicate_pushdown_nulls_first() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + for file in 0..5 { + // write multiple files to ensure we get pushdown of dynamic filters from one file to another + let name = format!("test{:02}.parquet", file); + write_file(&format!("{path}/{name}")); + } + + let name = format!("test{:02}.parquet", 100); + write_file_with_null_ids(&format!("{path}/{name}")); + + // nulls first by default + let query = "select name from base_table order by id desc limit 3"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+----------+", + "| name |", + "+----------+", + "| testnull |", + "| test02 |", + "| test02 |", + "+----------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=2"); +} + +#[tokio::test] +async fn test_topk_predicate_pushdown_multi_key() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + for file in 0..5 { + // write multiple files to ensure we get pushdown of dynamic filters from one file to another + // Ensure files are read in order + let name = format!("test{:02}.parquet", file); + write_file_with_non_null_ids(&format!("{path}/{name}"), file); + } + + let query = "select id from base_table order by name desc, id limit 3"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path.clone()); + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 0 |", + "| 1 |", + "| 2 |", + "+----+", + ]; + assert_batches_eq!(expected, &batches); + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=1"); + + let query1 = "select id from base_table order by name desc, id desc limit 3"; + let test_case = DynamicFilterTestCase::new(query1.to_string(), path.clone()); + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 4 |", + "| 3 |", + "| 2 |", + "+----+", + ]; + assert_batches_eq!(expected, &batches); + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=0"); + + let query1 = "select id from base_table order by name asc, id desc limit 3"; + let test_case = DynamicFilterTestCase::new(query1.to_string(), path); + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+----+", + "| id |", + "+----+", + "| 4 |", + "| 3 |", + "| 2 |", + "+----+", + ]; + assert_batches_eq!(expected, &batches); + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=0"); +} + +#[tokio::test] +async fn test_topk_predicate_pushdown_nulls_last() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + for file in 0..5 { + let name = format!("test{:02}.parquet", file); + write_file(&format!("{path}/{name}")); + } + let name = format!("test{:02}.parquet", 100); + write_file_with_null_ids(&format!("{path}/{name}")); + + let query = "select name from base_table order by id desc nulls last limit 3"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+--------+", + "| name |", + "+--------+", + "| test02 |", + "| test02 |", + "| test02 |", + "+--------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "row_groups_pruned_statistics=3"); +} + +#[tokio::test] +async fn test_topk_predicate_pushdown_single_file() { + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + + write_file(&format!("{path}/test.parquet")); + + let query = "select name from base_table order by id desc nulls last limit 1"; + let test_case = DynamicFilterTestCase::new(query.to_string(), path); + + let batches = test_case.results().await; + #[rustfmt::skip] + let expected = [ + "+--------+", + "| name |", + "+--------+", + "| test02 |", + "+--------+", + ]; + assert_batches_eq!(expected, &batches); + + let plan = test_case.explain_plan().await; + assert_contains!(&plan, "pushdown_rows_pruned=1"); +} + +#[tokio::test] +async fn test_topk_predicate_pushdown_ignores_partition_columns() { + // The TopK operator will try to push down predicates on `file_id`. + // But since `file_id` is a partition column and not part of the file itself + // we cannot actually do any filtering on it at the file level. + // Thus it has to be ignored by `ParquetSource`. + // This test only shows that this does not result in any errors or panics, + // it is expected that "nothing exciting" happens here. + // I do think in the future it would be interesting to re-design how partition columns + // get handled, in particular by pushing them into SchemaAdapter so that the table schema == file schema + // and we can do predicate pushdown on them as well without relying on each TableProvider to + // do special handling of partition columns. + + let ctx = SessionContext::new(); + let opt = ListingOptions::new(Arc::new(ParquetFormat::default())) + .with_table_partition_cols(vec![("file_id".to_string(), DataType::UInt32)]) + // We need to force 1 partition because TopK predicate pushdown happens on a per-partition basis + // If we had 1 file per partition (as an example) no pushdown would happen + .with_target_partitions(1); + + let tmp_dir = TempDir::new().unwrap(); + let path = tmp_dir.path().to_str().unwrap().to_string(); + for file in 0..5 { + // crete a directory for the partition + fs::create_dir_all(format!("{path}/file_id={file}")).unwrap(); + let name = format!("file_id={file}/test.parquet"); + write_file(&format!("{path}/{name}")); + } + ctx.register_listing_table("base_table", path, opt, None, None) + .await + .unwrap(); + + let query = "select file_id from base_table order by file_id asc limit 3"; + + let batches = ctx.sql(query).await.unwrap().collect().await.unwrap(); + #[rustfmt::skip] + let expected = [ + "+---------+", + "| file_id |", + "+---------+", + "| 0 |", + "| 0 |", + "| 1 |", + "+---------+", + ]; + assert_batches_eq!(expected, &batches); + + let sql = format!("explain analyze {query}"); + let batches = ctx.sql(&sql).await.unwrap().collect().await.unwrap(); + let explain_plan = format!("{}", pretty_format_batches(&batches).unwrap()); + assert_contains!(explain_plan, "row_groups_pruned_statistics=0"); // just documenting current behavior +} diff --git a/datafusion/core/tests/parquet/mod.rs b/datafusion/core/tests/parquet/mod.rs index f45eacce18df..1085c58317b2 100644 --- a/datafusion/core/tests/parquet/mod.rs +++ b/datafusion/core/tests/parquet/mod.rs @@ -39,7 +39,7 @@ use datafusion::{ use datafusion_expr::{Expr, LogicalPlan, LogicalPlanBuilder}; use parquet::arrow::ArrowWriter; use parquet::file::properties::{EnabledStatistics, WriterProperties}; -use std::sync::Arc; +use std::{fs::File, sync::Arc}; use tempfile::NamedTempFile; mod custom_reader; diff --git a/datafusion/core/tests/parquet/schema.rs b/datafusion/core/tests/parquet/schema.rs index 29afd3970432..baf0b77e808f 100644 --- a/datafusion/core/tests/parquet/schema.rs +++ b/datafusion/core/tests/parquet/schema.rs @@ -201,7 +201,7 @@ fn write_files(table_path: &Path, schemas: Vec) { let schema = Arc::new(schema); let filename = format!("part-{i}.parquet"); let path = table_path.join(filename); - let file = fs::File::create(path).unwrap(); + let file = File::create(path).unwrap(); let mut writer = ArrowWriter::try_new(file, schema.clone(), None).unwrap(); // create mock record batch diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs new file mode 100644 index 000000000000..6c6cb0f20af3 --- /dev/null +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow_schema::SortOptions; +use datafusion::{ + datasource::object_store::ObjectStoreUrl, + logical_expr::Operator, + physical_plan::{ + expressions::{BinaryExpr, Column, Literal}, + PhysicalExpr, + }, + scalar::ScalarValue, +}; +use datafusion_common::config::{ConfigOptions, TableParquetOptions}; +use datafusion_common::internal_err; +use datafusion_datasource::file::FileSource; +use datafusion_datasource::file_scan_config::FileScanConfigBuilder; +use datafusion_datasource::source::DataSourceExec; +use datafusion_datasource_parquet::source::ParquetSource; +use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_optimizer::filter_pushdown::PushdownFilter; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::{displayable, ExecutionPlan}; +use datafusion_physical_plan::{filter::FilterExec, sorts::sort::SortExec}; +use std::fmt::{Display, Formatter}; +use std::sync::{Arc, OnceLock}; + +#[test] +fn test_pushdown_into_scan() { + let scan = parquet_scan(); + let predicate = col_lit_predicate("a", "foo", schema()); + let plan = Arc::new(FilterExec::try_new(predicate, scan).unwrap()); + + // expect the predicate to be pushed down into the DataSource + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet, predicate=a@0 = foo + " + ); +} + +#[test] +fn test_parquet_pushdown() { + // filter should be pushed down into the parquet scan with two filters + let scan = parquet_scan(); + let predicate1 = col_lit_predicate("a", "foo", schema()); + let filter1 = Arc::new(FilterExec::try_new(predicate1, scan).unwrap()); + let predicate2 = col_lit_predicate("b", "bar", schema()); + let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: b@1 = bar + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet + output: + Ok: + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet, predicate=a@0 = foo AND b@1 = bar + " + ); +} + +#[test] +fn test_topk_pushdown() { + // filter should be pushed down into the parquet scan with two filters + let scan = parquet_scan(); + let predicate = col_lit_predicate("a", "foo", schema()); + let filter = + Arc::new(FilterExec::try_new(Arc::clone(&predicate), Arc::clone(&scan)).unwrap()); + let plan = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new_with_schema("a", schema()).unwrap()), + SortOptions::default(), + )]), + filter, + )); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - FilterExec: a@0 = foo + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet, predicate=a@0 = foo AND DynamicFilterPhysicalExpr [ SortDynamicFilterSource[ ] ] + " + ); + + let sort = Arc::new(SortExec::new( + LexOrdering::new(vec![PhysicalSortExpr::new( + Arc::new(Column::new_with_schema("a", schema()).unwrap()), + SortOptions::default(), + )]), + Arc::clone(&scan), + )); + let plan = Arc::new(FilterExec::try_new(predicate, sort).unwrap()); + + insta::assert_snapshot!( + OptimizationTest::new(plan, PushdownFilter{}), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = foo + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet + output: + Ok: + - SortExec: expr=[a@0 ASC], preserve_partitioning=[false] + - DataSourceExec: file_groups={0 groups: []}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ SortDynamicFilterSource[ ] ] AND a@0 = foo + " + ); +} + +/// Schema: +/// a: String +/// b: String +/// c: f64 +static TEST_SCHEMA: OnceLock = OnceLock::new(); + +fn schema() -> &'static SchemaRef { + TEST_SCHEMA.get_or_init(|| { + let fields = vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ]; + Arc::new(Schema::new(fields)) + }) +} + +/// Return a execution plan that reads from a parquet file +fn parquet_scan() -> Arc { + let schema = schema(); + let mut options = TableParquetOptions::default(); + options.global.pushdown_filters = true; + let source = ParquetSource::new(options).with_schema(Arc::clone(schema)); + let base_config = FileScanConfigBuilder::new( + ObjectStoreUrl::parse("test://").unwrap(), + Arc::clone(schema), + source, + ) + .build(); + DataSourceExec::from_data_source(base_config) +} + +/// Returns a predicate that is a binary expression col = lit +fn col_lit_predicate( + column_name: &str, + scalar_value: impl Into, + schema: &Schema, +) -> Arc { + let scalar_value = scalar_value.into(); + Arc::new(BinaryExpr::new( + Arc::new(Column::new_with_schema(column_name, schema).unwrap()), + Operator::Eq, + Arc::new(Literal::new(scalar_value)), + )) +} + +/// A harness for testing physical optimizers. +/// +/// You can use this to test the output of a physical optimizer rule using insta snapshots +#[derive(Debug)] +pub struct OptimizationTest { + input: Vec, + output: Result, String>, +} + +impl OptimizationTest { + pub fn new(input_plan: Arc, opt: O) -> Self + where + O: PhysicalOptimizerRule, + { + Self::new_with_config(input_plan, opt, &ConfigOptions::default()) + } + + pub fn new_with_config( + input_plan: Arc, + opt: O, + config: &ConfigOptions, + ) -> Self + where + O: PhysicalOptimizerRule, + { + let input = format_execution_plan(&input_plan); + + let input_schema = input_plan.schema(); + + let output_result = opt.optimize(input_plan, config); + let output = output_result + .and_then(|plan| { + if opt.schema_check() && (plan.schema() != input_schema) { + internal_err!( + "Schema mismatch:\n\nBefore:\n{:?}\n\nAfter:\n{:?}", + input_schema, + plan.schema() + ) + } else { + Ok(plan) + } + }) + .map(|plan| format_execution_plan(&plan)) + .map_err(|e| e.to_string()); + + Self { input, output } + } +} + +impl Display for OptimizationTest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + writeln!(f, "OptimizationTest:")?; + writeln!(f, " input:")?; + for line in &self.input { + writeln!(f, " - {line}")?; + } + writeln!(f, " output:")?; + match &self.output { + Ok(output) => { + writeln!(f, " Ok:")?; + for line in output { + writeln!(f, " - {line}")?; + } + } + Err(err) => { + writeln!(f, " Err: {err}")?; + } + } + Ok(()) + } +} + +pub fn format_execution_plan(plan: &Arc) -> Vec { + format_lines(&displayable(plan.as_ref()).indent(false).to_string()) +} + +fn format_lines(s: &str) -> Vec { + s.trim().split('\n').map(|s| s.to_string()).collect() +} diff --git a/datafusion/core/tests/physical_optimizer/mod.rs b/datafusion/core/tests/physical_optimizer/mod.rs index 7d5d07715eeb..fe7b9decfebf 100644 --- a/datafusion/core/tests/physical_optimizer/mod.rs +++ b/datafusion/core/tests/physical_optimizer/mod.rs @@ -21,6 +21,7 @@ mod aggregate_statistics; mod combine_partial_final_agg; mod enforce_distribution; mod enforce_sorting; +mod filter_pushdown; mod join_selection; mod limit_pushdown; mod limited_distinct_aggregation; diff --git a/datafusion/core/tests/sql/path_partition.rs b/datafusion/core/tests/sql/path_partition.rs index bf8466d849f2..773e30914e9c 100644 --- a/datafusion/core/tests/sql/path_partition.rs +++ b/datafusion/core/tests/sql/path_partition.rs @@ -42,8 +42,7 @@ use datafusion_common::stats::Precision; use datafusion_common::test_util::batches_to_sort_string; use datafusion_common::ScalarValue; use datafusion_execution::config::SessionConfig; -use datafusion_expr::{col, lit, Expr, Operator}; -use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; +use datafusion_expr::{col, lit, Expr}; use async_trait::async_trait; use bytes::Bytes; @@ -90,18 +89,9 @@ async fn parquet_partition_pruning_filter() -> Result<()> { if let Some((_, parquet_config)) = data_source_exec.downcast_to_file_source::() { - let pred = parquet_config.predicate().unwrap(); - // Only the last filter should be pushdown to TableScan - let expected = Arc::new(BinaryExpr::new( - Arc::new(Column::new_with_schema("id", &exec.schema()).unwrap()), - Operator::Gt, - Arc::new(Literal::new(ScalarValue::Int32(Some(1)))), - )); - - assert!(pred.as_any().is::()); - let pred = pred.as_any().downcast_ref::().unwrap(); - - assert_eq!(pred, expected.as_ref()); + assert!(parquet_config.predicate().is_none()); + } else { + panic!("Expected parquet source"); } Ok(()) } diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index 732fef47d5a7..8717d5301421 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -34,7 +34,7 @@ use arrow::error::ArrowError; use datafusion_common::{exec_err, Result}; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; -use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder}; use futures::{StreamExt, TryStreamExt}; use log::debug; @@ -54,10 +54,6 @@ pub(super) struct ParquetOpener { pub limit: Option, /// Optional predicate to apply during the scan pub predicate: Option>, - /// Optional pruning predicate applied to row group statistics - pub pruning_predicate: Option>, - /// Optional pruning predicate applied to data page statistics - pub page_pruning_predicate: Option>, /// Schema of the output table pub table_schema: SchemaRef, /// Optional hint for how large the initial request to read parquet metadata @@ -80,6 +76,8 @@ pub(super) struct ParquetOpener { pub enable_bloom_filter: bool, /// Schema adapter factory pub schema_adapter_factory: Arc, + /// Should row group pruning be applied + pub enable_stats_pruning: bool, } impl FileOpener for ParquetOpener { @@ -109,18 +107,33 @@ impl FileOpener for ParquetOpener { .schema_adapter_factory .create(projected_schema, Arc::clone(&self.table_schema)); let predicate = self.predicate.clone(); - let pruning_predicate = self.pruning_predicate.clone(); - let page_pruning_predicate = self.page_pruning_predicate.clone(); let table_schema = Arc::clone(&self.table_schema); let reorder_predicates = self.reorder_filters; let pushdown_filters = self.pushdown_filters; - let enable_page_index = should_enable_page_index( - self.enable_page_index, - &self.page_pruning_predicate, - ); let enable_bloom_filter = self.enable_bloom_filter; + let enable_stats_pruning = self.enable_stats_pruning; let limit = self.limit; + let predicate_creation_errors = MetricBuilder::new(&self.metrics) + .global_counter("num_predicate_creation_errors"); + + let (pruning_predicate, page_pruning_predicate) = + if let Some(predicate) = &predicate { + let pruning_predicate = build_pruning_predicate( + Arc::clone(predicate), + &table_schema, + &predicate_creation_errors, + ); + let page_pruning_predicate = + build_page_pruning_predicate(predicate, &table_schema); + (pruning_predicate, Some(page_pruning_predicate)) + } else { + (None, None) + }; + + let enable_page_index = + should_enable_page_index(self.enable_page_index, &page_pruning_predicate); + Ok(Box::pin(async move { let options = ArrowReaderOptions::new().with_page_index(enable_page_index); @@ -197,13 +210,15 @@ impl FileOpener for ParquetOpener { } // If there is a predicate that can be evaluated against the metadata if let Some(predicate) = predicate.as_ref() { - row_groups.prune_by_statistics( - &file_schema, - builder.parquet_schema(), - rg_metadata, - predicate, - &file_metrics, - ); + if enable_stats_pruning { + row_groups.prune_by_statistics( + &file_schema, + builder.parquet_schema(), + rg_metadata, + predicate, + &file_metrics, + ); + } if enable_bloom_filter && !row_groups.is_empty() { row_groups @@ -295,3 +310,40 @@ fn create_initial_plan( // default to scanning all row groups Ok(ParquetAccessPlan::new_all(row_group_count)) } + +/// Build a pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a pruning +/// predicate, return None. +/// If there is an error creating the pruning predicate it is recorded by incrementing +/// the `predicate_creation_errors` counter. +pub(crate) fn build_pruning_predicate( + predicate: Arc, + file_schema: &SchemaRef, + predicate_creation_errors: &Count, +) -> Option> { + match PruningPredicate::try_new(predicate, Arc::clone(file_schema)) { + Ok(pruning_predicate) => { + if !pruning_predicate.always_true() { + return Some(Arc::new(pruning_predicate)); + } + } + Err(e) => { + debug!("Could not create pruning predicate for: {e}"); + predicate_creation_errors.add(1); + } + } + None +} + +/// Build a page pruning predicate from an optional predicate expression. +/// If the predicate is None or the predicate cannot be converted to a page pruning +/// predicate, return None. +pub(crate) fn build_page_pruning_predicate( + predicate: &Arc, + file_schema: &SchemaRef, +) -> Arc { + Arc::new(PagePruningAccessPlanFilter::new( + predicate, + Arc::clone(file_schema), + )) +} diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index 66d4d313d5a6..66695c3e4864 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -17,13 +17,17 @@ //! ParquetSource implementation for reading parquet files use std::any::Any; +use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; +use crate::opener::build_page_pruning_predicate; +use crate::opener::build_pruning_predicate; use crate::opener::ParquetOpener; use crate::page_filter::PagePruningAccessPlanFilter; use crate::DefaultParquetFileReaderFactory; use crate::ParquetFileReaderFactory; +use datafusion_datasource::file::FileSourceFilterPushdownResult; use datafusion_datasource::file_stream::FileOpener; use datafusion_datasource::schema_adapter::{ DefaultSchemaAdapterFactory, SchemaAdapterFactory, @@ -34,14 +38,15 @@ use datafusion_common::config::TableParquetOptions; use datafusion_common::Statistics; use datafusion_datasource::file::FileSource; use datafusion_datasource::file_scan_config::FileScanConfig; +use datafusion_physical_expr::{conjunction, expressions::lit}; use datafusion_physical_expr_common::physical_expr::fmt_sql; use datafusion_physical_expr_common::physical_expr::PhysicalExpr; use datafusion_physical_optimizer::pruning::PruningPredicate; +use datafusion_physical_plan::execution_plan::FilterSupport; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder}; use datafusion_physical_plan::DisplayFormatType; use itertools::Itertools; -use log::debug; use object_store::ObjectStore; /// Execution plan for reading one or more Parquet files. @@ -316,24 +321,10 @@ impl ParquetSource { conf = conf.with_metrics(metrics); conf.predicate = Some(Arc::clone(&predicate)); - match PruningPredicate::try_new(Arc::clone(&predicate), Arc::clone(&file_schema)) - { - Ok(pruning_predicate) => { - if !pruning_predicate.always_true() { - conf.pruning_predicate = Some(Arc::new(pruning_predicate)); - } - } - Err(e) => { - debug!("Could not create pruning predicate for: {e}"); - predicate_creation_errors.add(1); - } - }; - - let page_pruning_predicate = Arc::new(PagePruningAccessPlanFilter::new( - &predicate, - Arc::clone(&file_schema), - )); - conf.page_pruning_predicate = Some(page_pruning_predicate); + conf.page_pruning_predicate = + Some(build_page_pruning_predicate(&predicate, &file_schema)); + conf.pruning_predicate = + build_pruning_predicate(predicate, &file_schema, &predicate_creation_errors); conf } @@ -349,11 +340,13 @@ impl ParquetSource { } /// Optional reference to this parquet scan's pruning predicate + #[deprecated(note = "ParquetSource no longer constructs a PruningPredicate.")] pub fn pruning_predicate(&self) -> Option<&Arc> { self.pruning_predicate.as_ref() } /// Optional reference to this parquet scan's page pruning predicate + #[deprecated(note = "ParquetSource no longer constructs a PruningPredicate.")] pub fn page_pruning_predicate(&self) -> Option<&Arc> { self.page_pruning_predicate.as_ref() } @@ -488,8 +481,6 @@ impl FileSource for ParquetSource { .expect("Batch size must set before creating ParquetOpener"), limit: base_config.limit, predicate: self.predicate.clone(), - pruning_predicate: self.pruning_predicate.clone(), - page_pruning_predicate: self.page_pruning_predicate.clone(), table_schema: Arc::clone(&base_config.file_schema), metadata_size_hint: self.metadata_size_hint, metrics: self.metrics().clone(), @@ -498,6 +489,7 @@ impl FileSource for ParquetSource { reorder_filters: self.reorder_filters(), enable_page_index: self.enable_page_index(), enable_bloom_filter: self.bloom_filter_on_read(), + enable_stats_pruning: self.table_parquet_options.global.pruning, schema_adapter_factory, }) } @@ -537,11 +529,10 @@ impl FileSource for ParquetSource { .expect("projected_statistics must be set"); // When filters are pushed down, we have no way of knowing the exact statistics. // Note that pruning predicate is also a kind of filter pushdown. - // (bloom filters use `pruning_predicate` too) - if self.pruning_predicate().is_some() - || self.page_pruning_predicate().is_some() - || (self.predicate().is_some() && self.pushdown_filters()) - { + // (bloom filters use `pruning_predicate` too). + // Because filter pushdown may happen dynamically as long as there is a predicate + // if we have *any* predicate applied, we can't guarantee the statistics are exact. + if self.predicate().is_some() { Ok(statistics.to_inexact()) } else { Ok(statistics) @@ -559,6 +550,7 @@ impl FileSource for ParquetSource { .predicate() .map(|p| format!(", predicate={p}")) .unwrap_or_default(); + #[expect(deprecated)] let pruning_predicate_string = self .pruning_predicate() .map(|pre| { @@ -586,4 +578,39 @@ impl FileSource for ParquetSource { } } } + + fn push_down_filters( + &self, + filters: &[Arc], + ) -> datafusion_common::Result> { + let mut conf = self.clone(); + let predicate = match self.predicate.as_ref() { + Some(existing_predicate) => { + // Combine existing predicate with new filters + Some(conjunction( + std::iter::once(Arc::clone(existing_predicate)) + .chain(filters.iter().map(Arc::clone)), + )) + } + None => Some(conjunction(filters.iter().map(Arc::clone))), + }; + match predicate { + Some(new_predicate) if !new_predicate.eq(&lit(true)) => { + conf.predicate = Some(new_predicate); + // Respect the current pushdown filters setting, + // otherwise we would mark filters as exact but then not filter at the row level + // because the setting gets checked again inside the ParquetOpener! + let support = if self.table_parquet_options.global.pushdown_filters { + vec![FilterSupport::HandledExact; filters.len()] + } else { + vec![FilterSupport::Unhandled; filters.len()] + }; + Ok(Some(FileSourceFilterPushdownResult::new( + Arc::new(conf), + support, + ))) + } + _no_op_predicate => Ok(None), + } + } } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 0066f39801a1..6ddcfff840c8 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -27,7 +27,8 @@ use crate::file_scan_config::FileScanConfig; use crate::file_stream::FileOpener; use arrow::datatypes::SchemaRef; use datafusion_common::Statistics; -use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::execution_plan::FilterPushdownResult; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion_physical_plan::DisplayFormatType; @@ -93,4 +94,13 @@ pub trait FileSource: Send + Sync { } Ok(None) } + + fn push_down_filters( + &self, + _filters: &[Arc], + ) -> datafusion_common::Result> { + Ok(None) + } } + +pub type FileSourceFilterPushdownResult = FilterPushdownResult>; diff --git a/datafusion/datasource/src/file_scan_config.rs b/datafusion/datasource/src/file_scan_config.rs index 5172dafb1f91..0c264a1f1fec 100644 --- a/datafusion/datasource/src/file_scan_config.rs +++ b/datafusion/datasource/src/file_scan_config.rs @@ -37,7 +37,7 @@ use datafusion_execution::{ object_store::ObjectStoreUrl, SendableRecordBatchStream, TaskContext, }; use datafusion_physical_expr::{ - expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, + expressions::Column, EquivalenceProperties, LexOrdering, Partitioning, PhysicalExpr, PhysicalSortExpr, }; use datafusion_physical_plan::{ @@ -48,7 +48,6 @@ use datafusion_physical_plan::{ }; use log::{debug, warn}; -use crate::file_groups::FileGroup; use crate::{ display::FileGroupsDisplay, file::FileSource, @@ -58,6 +57,7 @@ use crate::{ statistics::MinMaxStatistics, PartitionedFile, }; +use crate::{file_groups::FileGroup, source::DataSourceFilterPushdownResult}; /// The base configurations for a [`DataSourceExec`], the a physical plan for /// any given file format. @@ -584,6 +584,22 @@ impl DataSource for FileScanConfig { ) as _ })) } + + fn push_down_filters( + &self, + filters: &[Arc], + ) -> Result> { + if let Some(file_source_result) = self.file_source.push_down_filters(filters)? { + let mut new_self = self.clone(); + new_self.file_source = file_source_result.inner; + Ok(Some(DataSourceFilterPushdownResult { + inner: Arc::new(new_self) as Arc, + support: file_source_result.support, + })) + } else { + Ok(None) + } + } } impl FileScanConfig { diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 6c9122ce1ac1..b5fe5d2d80dd 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -22,7 +22,10 @@ use std::fmt; use std::fmt::{Debug, Formatter}; use std::sync::Arc; -use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType}; +use datafusion_physical_plan::execution_plan::{ + Boundedness, EmissionType, ExecutionPlanFilterPushdownResult, FilterPushdownResult, + FilterSupport, +}; use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use datafusion_physical_plan::projection::ProjectionExec; use datafusion_physical_plan::{ @@ -33,7 +36,7 @@ use crate::file_scan_config::FileScanConfig; use datafusion_common::config::ConfigOptions; use datafusion_common::{Constraints, Statistics}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; -use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexOrdering; /// Common behaviors in Data Sources for both from Files and Memory. @@ -79,8 +82,20 @@ pub trait DataSource: Send + Sync + Debug { &self, _projection: &ProjectionExec, ) -> datafusion_common::Result>>; + /// Push down filters from parent execution plans to this data source. + /// This is expected to return Ok(None) if the filters cannot be pushed down. + /// If they can be pushed down it should return a [`FilterPushdownResult`] containing the new + /// data source and the support level for each filter (exact or inexact). + fn push_down_filters( + &self, + _filters: &[Arc], + ) -> datafusion_common::Result> { + Ok(None) + } } +pub type DataSourceFilterPushdownResult = FilterPushdownResult>; + /// [`ExecutionPlan`] handles different file formats like JSON, CSV, AVRO, ARROW, PARQUET /// /// `DataSourceExec` implements common functionality such as applying projections, @@ -192,6 +207,28 @@ impl ExecutionPlan for DataSourceExec { ) -> datafusion_common::Result>> { self.data_source.try_swapping_with_projection(projection) } + + fn with_filter_pushdown_result( + self: Arc, + own_filters_result: &[FilterSupport], + parent_filters_remaining: &[Arc], + ) -> datafusion_common::Result> { + // We didn't give out any filters, this should be empty! + assert!(own_filters_result.is_empty()); + // Forward filter pushdown to our data source. + if let Some(pushdown_result) = self + .data_source + .push_down_filters(parent_filters_remaining)? + { + let new_self = Arc::new(DataSourceExec::new(pushdown_result.inner)); + Ok(Some(ExecutionPlanFilterPushdownResult::new( + new_self, + pushdown_result.support, + ))) + } else { + Ok(None) + } + } } impl DataSourceExec { @@ -254,3 +291,13 @@ impl DataSourceExec { }) } } + +/// Create a new `DataSourceExec` from a `DataSource` +impl From for DataSourceExec +where + S: DataSource + 'static, +{ + fn from(source: S) -> Self { + Self::new(Arc::new(source)) + } +} diff --git a/datafusion/physical-expr-common/src/physical_expr.rs b/datafusion/physical-expr-common/src/physical_expr.rs index 43f214607f9f..2f5719a061dd 100644 --- a/datafusion/physical-expr-common/src/physical_expr.rs +++ b/datafusion/physical-expr-common/src/physical_expr.rs @@ -27,6 +27,7 @@ use arrow::array::BooleanArray; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{internal_err, not_impl_err, Result, ScalarValue}; use datafusion_expr_common::columnar_value::ColumnarValue; use datafusion_expr_common::interval_arithmetic::Interval; @@ -283,6 +284,47 @@ pub trait PhysicalExpr: Send + Sync + Display + Debug + DynEq + DynHash { /// See the [`fmt_sql`] function for an example of printing `PhysicalExpr`s as SQL. /// fn fmt_sql(&self, f: &mut Formatter<'_>) -> fmt::Result; + + /// Take a snapshot of this `PhysicalExpr` if it is dynamic. + /// This is used to capture the current state of `PhysicalExpr`s that may contain + /// dynamic references to other operators in order to serialize it over the wire + /// or treat it via downcast matching. + /// + /// You should not call this method directly as it does not handle recursion. + /// Instead use [`snapshot_physical_expr`] to handle recursion and capture the + /// full state of the `PhysicalExpr`. + /// + /// This is expected to return "simple" expressions that do not have mutable state + /// and are composed of DataFusion's built-in `PhysicalExpr` implementations. + /// Callers however should *not* assume anything about the returned expressions + /// since callers and implementers may not agree on what "simple" or "built-in" + /// means. + /// In other words, if you need to searlize a `PhysicalExpr` across the wire + /// you should call this method and then try to serialize the result, + /// but you should handle unknown or unexpected `PhysicalExpr` implementations gracefully + /// just as if you had not called this method at all. + /// + /// In particular, consider: + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::TopK` + /// that is involved in a query with `SELECT * FROM t1 ORDER BY a LIMIT 10`. + /// This function may return something like `a >= 12`. + /// * A `PhysicalExpr` that references the current state of a `datafusion::physical_plan::joins::HashJoinExec` + /// from a query such as `SELECT * FROM t1 JOIN t2 ON t1.a = t2.b`. + /// This function may return something like `t2.b IN (1, 5, 7)`. + /// + /// A system or function that can only deal with a hardcoded set of `PhysicalExpr` implementations + /// or needs to serialize this state to bytes may not be able to handle these dynamic references. + /// In such cases, we should return a simplified version of the `PhysicalExpr` that does not + /// contain these dynamic references. + /// + /// Note for implementers: this method should *not* handle recursion. + /// Recursion is handled in [`snapshot_physical_expr`]. + fn snapshot(&self) -> Result>> { + // By default, we return None to indicate that this PhysicalExpr does not + // have any dynamic references or state. + // This is a safe default behavior. + Ok(None) + } } /// [`PhysicalExpr`] can't be constrained by [`Eq`] directly because it must remain object @@ -446,3 +488,30 @@ pub fn fmt_sql(expr: &dyn PhysicalExpr) -> impl Display + '_ { Wrapper { expr } } + +/// Take a snapshot of the given `PhysicalExpr` if it is dynamic. +/// +/// Take a snapshot of this `PhysicalExpr` if it is dynamic. +/// This is used to capture the current state of `PhysicalExpr`s that may contain +/// dynamic references to other operators in order to serialize it over the wire +/// or treat it via downcast matching. +/// +/// See the documentation of [`PhysicalExpr::snapshot`] for more details. +/// +/// # Returns +/// +/// Returns an `Option>` which is the snapshot of the +/// `PhysicalExpr` if it is dynamic. If the `PhysicalExpr` does not have +/// any dynamic references or state, it returns `None`. +pub fn snapshot_physical_expr( + expr: Arc, +) -> Result> { + expr.transform_up(|e| { + if let Some(snapshot) = e.snapshot()? { + Ok(Transformed::yes(snapshot)) + } else { + Ok(Transformed::no(Arc::clone(&e))) + } + }) + .data() +} diff --git a/datafusion/physical-expr/src/lib.rs b/datafusion/physical-expr/src/lib.rs index 93ced2eb628d..34e20a690522 100644 --- a/datafusion/physical-expr/src/lib.rs +++ b/datafusion/physical-expr/src/lib.rs @@ -59,7 +59,9 @@ pub use physical_expr::{ PhysicalExprRef, }; -pub use datafusion_physical_expr_common::physical_expr::PhysicalExpr; +pub use datafusion_physical_expr_common::physical_expr::{ + snapshot_physical_expr, PhysicalExpr, +}; pub use datafusion_physical_expr_common::sort_expr::{ LexOrdering, LexRequirement, PhysicalSortExpr, PhysicalSortRequirement, }; @@ -68,7 +70,7 @@ pub use planner::{create_physical_expr, create_physical_exprs}; pub use scalar_function::ScalarFunctionExpr; pub use datafusion_physical_expr_common::utils::reverse_order_bys; -pub use utils::split_conjunction; +pub use utils::{conjunction, split_conjunction}; // For backwards compatibility pub mod tree_node { diff --git a/datafusion/physical-expr/src/utils/mod.rs b/datafusion/physical-expr/src/utils/mod.rs index 7e4c7f0e10ba..21496c5edef5 100644 --- a/datafusion/physical-expr/src/utils/mod.rs +++ b/datafusion/physical-expr/src/utils/mod.rs @@ -47,6 +47,22 @@ pub fn split_conjunction( split_impl(Operator::And, predicate, vec![]) } +/// Create a conjunction of the given predicates. +/// If the input is empty, return a literal true. +/// If the input contains a single predicate, return the predicate. +/// Otherwise, return a conjunction of the predicates (e.g. `a AND b AND c`). +pub fn conjunction( + predicates: impl IntoIterator>, +) -> Arc { + predicates + .into_iter() + .fold(None, |acc, predicate| match acc { + None => Some(predicate), + Some(acc) => Some(Arc::new(BinaryExpr::new(acc, Operator::And, predicate))), + }) + .unwrap_or_else(|| crate::expressions::lit(true)) +} + /// Assume the predicate is in the form of DNF, split the predicate to a Vec of PhysicalExprs. /// /// For example, split "a1 = a2 OR b1 <= b2 OR c1 != c2" into ["a1 = a2", "b1 <= b2", "c1 != c2"] diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs b/datafusion/physical-optimizer/src/filter_pushdown.rs new file mode 100644 index 000000000000..21610564306c --- /dev/null +++ b/datafusion/physical-optimizer/src/filter_pushdown.rs @@ -0,0 +1,501 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use datafusion_common::{config::ConfigOptions, Result}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_plan::{ + execution_plan::{ + ExecutionPlanFilterPushdownResult, FilterPushdownAllowed, FilterSupport, + }, + with_new_children_if_necessary, ExecutionPlan, +}; + +use crate::PhysicalOptimizerRule; + +/// The state of filter pushdown support for a given filter. +#[derive(Clone, Copy, Debug)] +enum ChildPushdownState { + /// A child said it can handle the filter exactly. + ChildExact, + /// A child exists and took a look at the filter. + /// It may partially handle it or not handle it at all. + /// The parent still needs to re-apply the filter. + ChildInexact, + /// No child exists, there is no one to handle the filter. + /// This is the default / initial state. + NoChild, +} + +impl ChildPushdownState { + /// Combine the current state with another state. + /// This is used to combine the results of multiple children. + fn combine_with_other(&self, other: &FilterSupport) -> ChildPushdownState { + match (other, self) { + (FilterSupport::HandledExact, ChildPushdownState::NoChild) => { + ChildPushdownState::ChildExact + } + (FilterSupport::HandledExact, ChildPushdownState::ChildInexact) => { + ChildPushdownState::ChildInexact + } + (FilterSupport::Unhandled, ChildPushdownState::NoChild) => { + ChildPushdownState::ChildInexact + } + (FilterSupport::Unhandled, ChildPushdownState::ChildExact) => { + ChildPushdownState::ChildInexact + } + (FilterSupport::Unhandled, ChildPushdownState::ChildInexact) => { + ChildPushdownState::ChildInexact + } + (FilterSupport::HandledExact, ChildPushdownState::ChildExact) => { + // If both are exact, keep it as exact + ChildPushdownState::ChildExact + } + } + } +} + +/// See [`pushdown_filters`] for more details. +fn push_down_into_children( + node: &Arc, + filters: &[Arc], +) -> Result { + let children = node.children(); + let mut new_children = Vec::with_capacity(children.len()); + let mut filter_pushdown_result = vec![ChildPushdownState::NoChild; filters.len()]; + for child in children { + if let Some(result) = pushdown_filters(child, filters)? { + new_children.push(result.inner); + for (idx, support) in result.support.iter().enumerate() { + filter_pushdown_result[idx] = + filter_pushdown_result[idx].combine_with_other(support) + } + } else { + new_children.push(Arc::clone(child)); + } + } + let support = filter_pushdown_result + .iter() + .map(|s| match s { + ChildPushdownState::ChildExact => FilterSupport::HandledExact, + ChildPushdownState::ChildInexact => FilterSupport::Unhandled, + ChildPushdownState::NoChild => FilterSupport::Unhandled, + }) + .collect::>(); + let node = with_new_children_if_necessary(Arc::clone(node), new_children)?; + Ok(ExecutionPlanFilterPushdownResult::new(node, support)) +} + +/// Recursively a collection of filters down through the execution plan tree in a depth-first manner. +/// +/// For each filter we try to push it down to children as far down as possible, keeping track of if the children +/// can handle the filter or not. +/// +/// If a child can handle the filter, we mark it as handled exact and parent nodes (including the source of the filter) +/// can decide to discard it / not re-apply it themselves. +/// If a child cannot handle the filter or may return false positives (aka "inexact" handling) we mark it as handled inexact. +/// If a child does not allow filter pushdown at all (e.g. an aggregation node) we keep recursing but clear the current set of filters +/// we are pushing down. +/// +/// As we recurse back up the tree we combine the results of the children to determine if the overall result is exact or inexact: +/// - For nodes with a single child we just take the child's result. +/// - For nodes with multiple children we combine the results of the children to determine if the overall result is exact or inexact. +/// We do this by checking if all children are exact (we return exact up) or if any child is inexact (we return inexact). +/// - If a node has no children this is equivalent to inexact handling (there is no child to handle the filter). +/// +/// See [`PushdownFilter`] for more details on how this works in practice. +fn pushdown_filters( + node: &Arc, + parent_filters: &[Arc], +) -> Result> { + // Gather the filters from the current node. + // These are the filters the current node "owns" or "produces" and wants to push down. + let node_filters = node.filters_for_pushdown()?; + // Check which nodes from parents this node is okay with us trying to push down to it's children. + let parent_pushdown_request_result = node.filter_pushdown_request(parent_filters)?; + // Do some index masking so that we only ever call nodes with the filters relevant to them / that they're allowed to touch. + // But we still need to reconstruct the full result for our caller. + let parent_filter_for_pushdown_indices = parent_pushdown_request_result + .iter() + .enumerate() + .filter_map(|(i, s)| { + if matches!(s, FilterPushdownAllowed::Allowed(_)) { + Some(i) + } else { + None + } + }) + .collect::>(); + let parent_filters_to_push_down = parent_filter_for_pushdown_indices + .iter() + .map(|&i| Arc::clone(&parent_filters[i])) + .collect::>(); + let all_filters_to_push_down = node_filters + .iter() + .chain(parent_filters_to_push_down.iter()) + .map(Arc::clone) + .collect::>(); + // Push down into children + let child_pushdown_result = push_down_into_children(node, &all_filters_to_push_down)?; + let mut node = child_pushdown_result.inner; + // A bit more index masking to construct the final result for our caller. + let node_filters_pushdown_result = + child_pushdown_result.support[..node_filters.len()].to_vec(); + let mut parent_filter_pushdown_result = + vec![FilterSupport::Unhandled; parent_filters.len()]; + for (parent_filter_idx, support) in parent_filter_for_pushdown_indices + .iter() + .zip(child_pushdown_result.support[node_filters.len()..].iter()) + { + parent_filter_pushdown_result[*parent_filter_idx] = *support; + } + // Collect the remaining unhandled parent filters + let unhandled_parent_filter_indices = (0..parent_filters.len()) + .filter(|&i| matches!(parent_filter_pushdown_result[i], FilterSupport::Unhandled)) + .collect::>(); + let unhandled_parent_filters = unhandled_parent_filter_indices + .iter() + .map(|&i| Arc::clone(&parent_filters[i])) + .collect::>(); + // Check if the node can handle the filters + if let Some(result) = Arc::clone(&node).with_filter_pushdown_result( + &node_filters_pushdown_result, + &unhandled_parent_filters, + )? { + node = result.inner; + for (parent_filter_index, support) in + unhandled_parent_filter_indices.iter().zip(result.support) + { + parent_filter_pushdown_result[*parent_filter_index] = support; + } + } + Ok(Some(ExecutionPlanFilterPushdownResult::new( + node, + parent_filter_pushdown_result, + ))) +} + +/// A physical optimizer rule that pushes down filters in the execution plan. +/// For example, consider the following plan: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// Our goal is to move the `id = 1` filter from the `FilterExec` node to the `DataSourceExec` node. +/// If this filter is selective it can avoid massive amounts of data being read from the source (the projection is `*` so all matching columns are read). +/// In this simple case we: +/// 1. Enter the recursion with no filters. +/// 2. We find the `FilterExec` node and it tells us that it has a filter (see [`ExecutionPlan::filters_for_pushdown`] and `datafusion::physical_plan::filter::FilterExec`). +/// 3. We recurse down into it's children (the `DataSourceExec` node) now carrying the filters `[id = 1]`. +/// 4. The `DataSourceExec` node tells us that it can handle the filter and we mark it as handled exact (see [`ExecutionPlan::with_filter_pushdown_result`]). +/// 5. Since the `DataSourceExec` node has no children we recurse back up the tree. +/// 6. We now tell the `FilterExec` node that it has a child that can handle the filter and we mark it as handled exact (see [`ExecutionPlan::with_filter_pushdown_result`]). +/// The `FilterExec` node can now return a new execution plan, either a copy of itself without that filter or if has no work left to do it can even return the child node directly. +/// 7. We recurse back up to `CoalesceBatchesExec` and do nothing there since it had no filters to push down. +/// +/// The new plan looks like: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ CoalesceBatchesExec │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +// │ projection = * │ +// │ filters = [ id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// Let's consider a more complex example involving a `ProjectionExec` node in betweeen the `FilterExec` and `DataSourceExec` nodes that creates a new column that the filter depends on. +/// +/// ```text +// ┌──────────────────────┐ +// │ CoalesceBatchesExec │ +// └──────────────────────┘ +// │ +// ▼ +// ┌──────────────────────┐ +// │ FilterExec │ +// │ filters = │ +// │ [cost>50,id=1] │ +// └──────────────────────┘ +// │ +// ▼ +// ┌──────────────────────┐ +// │ ProjectionExec │ +// │ cost = price * 1.2 │ +// └──────────────────────┘ +// │ +// ▼ +// ┌──────────────────────┐ +// │ DataSourceExec │ +// │ projection = * │ +// └──────────────────────┘ +/// ``` +/// +/// We want to push down the filters [id=1] to the `DataSourceExec` node, but can't push down `cost>50` because it requires the `ProjectionExec` node to be executed first: +/// +/// ```text +// ┌──────────────────────┐ +// │ CoalesceBatchesExec │ +// └──────────────────────┘ +// │ +// ▼ +// ┌──────────────────────┐ +// │ FilterExec │ +// │ filters = │ +// │ [cost>50] │ +// └──────────────────────┘ +// │ +// ▼ +// ┌──────────────────────┐ +// │ ProjectionExec │ +// │ cost = price * 1.2 │ +// └──────────────────────┘ +// │ +// ▼ +// ┌──────────────────────┐ +// │ DataSourceExec │ +// │ projection = * │ +// │ filters = [ id=1] │ +// └──────────────────────┘ +/// ``` +/// +/// There are also cases where we may be able to push down filters within a subtree but not the entire tree. +/// A good exmaple of this is aggreagation nodes: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [sum > 10] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌───────────────────────┐ +/// │ AggregateExec │ +/// │ group by = [id] │ +/// │ aggregate = │ +/// │ [sum(price)] │ +/// └───────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [id=1] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// The transformation here is to push down the `[id=1]` filter to the `DataSourceExec` node: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ ProjectionExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ FilterExec │ +/// │ filters = [sum > 10] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌───────────────────────┐ +/// │ AggregateExec │ +/// │ group by = [id] │ +/// │ aggregate = │ +/// │ [sum(price)] │ +/// └───────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = [id=1] │ +/// └──────────────────────┘ +/// ``` +/// +/// The point here is that: +/// 1. We cannot push down `sum > 10` through the `AggregateExec` node into the `DataSourceExec` node. +/// Any filters above the `AggregateExec` node are not pushed down. +/// This is determined by calling [`ExecutionPlan::filter_pushdown_request`] on the `AggregateExec` node. +/// 2. We need to keep recursing into the tree so that we can discover the other `FilterExec` node and push down the [id=1] filter. +/// +/// It is also possible to push down filters through joins and from joins. +/// For example, a hash join where we build a hash table of the left side and probe the right side +/// (ignoring why we would choose this order, typically it depends on the size of each table, etc.). +/// +/// ```text +/// ┌─────────────────────┐ +/// │ FilterExec │ +/// │ filters = │ +/// │ [d.size > 100] │ +/// └─────────────────────┘ +/// │ +/// │ +/// ┌──────────▼──────────┐ +/// │ │ +/// │ HashJoinExec │ +/// │ [u.dept@hash(d.id)] │ +/// │ │ +/// └─────────────────────┘ +/// │ +/// ┌────────────┴────────────┐ +/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ +/// │ DataSourceExec │ │ DataSourceExec │ +/// │ alias [users as u] │ │ alias [dept as d] │ +/// │ │ │ │ +/// └─────────────────────┘ └─────────────────────┘ +/// ``` +/// +/// There are two pushdowns we can do here: +/// 1. Push down the `d.size > 100` filter through the `HashJoinExec` node to the `DataSourceExec` node for the `departments` table. +/// 2. Push down the hash table state from the `HashJoinExec` node to the `DataSourceExec` node to avoid reading +/// rows from teh `users` table that will be eliminated by the join. +/// This can be done via a bloom filter or similar. +/// +/// ```text +/// ┌─────────────────────┐ +/// │ │ +/// │ HashJoinExec │ +/// │ [u.dept@hash(d.id)] │ +/// │ │ +/// └─────────────────────┘ +/// │ +/// ┌────────────┴────────────┐ +/// ┌──────────▼──────────┐ ┌──────────▼──────────┐ +/// │ DataSourceExec │ │ DataSourceExec │ +/// │ alias [users as u] │ │ alias [dept as d] │ +/// │ filters = │ │ filters = │ +/// │ [depg@hash(d.id)] │ │ [ d.size > 100] │ +/// └─────────────────────┘ └─────────────────────┘ +/// ``` +/// +/// You may notice in this case that the filter is *dynamic*: the hash table is built +/// _after_ the `departments` table is read and at runtime. +/// We don't have a concrete `InList` filter or similar to push down at optimization time. +/// These sorts of dynamic filters are handled by building a specialized [`PhysicalExpr`] that +/// internally maintains a reference to the hash table or other state. +/// To make working with these sorts of dynamic filters more tractable we have the method [`PhysicalExpr::snapshot`] +/// which attempts to simplify a dynamic filter into a "basic" non-dynamic filter. +/// For a join this could mean converting it to an `InList` filter or a min/max filter for example. +/// See `datafusion/physical-plan/src/dynamic_filters.rs` for more details. +/// +/// Another form of dyanmic filter is pushing down the state of a `TopK` operator for queries like +/// `SELECT * FROM t ORDER BY id LIMIT 10`: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ TopK │ +/// │ limit = 10 │ +/// │ order by = [id] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// └──────────────────────┘ +/// ``` +/// +/// We can avoid large amounts of data processing by transforming this into: +/// +/// ```text +/// ┌──────────────────────┐ +/// │ TopK │ +/// │ limit = 10 │ +/// │ order by = [id] │ +/// └──────────────────────┘ +/// │ +/// ▼ +/// ┌──────────────────────┐ +/// │ DataSourceExec │ +/// │ projection = * │ +/// │ filters = │ +/// │ [id < @ TopKHeap] │ +/// └──────────────────────┘ +/// ``` +/// +/// Now as we fill our `TopK` heap we can push down the state of the heap to the `DataSourceExec` node +/// to avoid reading files / row groups / pages / rows that could not possibly be in the top 10. +/// This is implemented in datafusion/physical-plan/src/sorts/sort_filters.rs. +#[derive(Debug)] +pub struct PushdownFilter {} + +impl Default for PushdownFilter { + fn default() -> Self { + Self::new() + } +} + +impl PushdownFilter { + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for PushdownFilter { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> Result> { + if let Some(result) = pushdown_filters(&plan, &[])? { + Ok(result.inner) + } else { + Ok(plan) + } + } + + fn name(&self) -> &str { + "FilterPushdown" + } + + fn schema_check(&self) -> bool { + true // Filter pushdown does not change the schema of the plan + } +} diff --git a/datafusion/physical-optimizer/src/lib.rs b/datafusion/physical-optimizer/src/lib.rs index 35503f3b0b5f..5a43d7118d63 100644 --- a/datafusion/physical-optimizer/src/lib.rs +++ b/datafusion/physical-optimizer/src/lib.rs @@ -29,6 +29,7 @@ pub mod coalesce_batches; pub mod combine_partial_final_agg; pub mod enforce_distribution; pub mod enforce_sorting; +pub mod filter_pushdown; pub mod join_selection; pub mod limit_pushdown; pub mod limited_distinct_aggregation; diff --git a/datafusion/physical-optimizer/src/optimizer.rs b/datafusion/physical-optimizer/src/optimizer.rs index bab31150e250..8bd22cbf1bda 100644 --- a/datafusion/physical-optimizer/src/optimizer.rs +++ b/datafusion/physical-optimizer/src/optimizer.rs @@ -25,6 +25,7 @@ use crate::coalesce_batches::CoalesceBatches; use crate::combine_partial_final_agg::CombinePartialFinalAggregate; use crate::enforce_distribution::EnforceDistribution; use crate::enforce_sorting::EnforceSorting; +use crate::filter_pushdown::PushdownFilter; use crate::join_selection::JoinSelection; use crate::limit_pushdown::LimitPushdown; use crate::limited_distinct_aggregation::LimitedDistinctAggregation; @@ -121,6 +122,10 @@ impl PhysicalOptimizer { // into an `order by max(x) limit y`. In this case it will copy the limit value down // to the aggregation, allowing it to use only y number of accumulators. Arc::new(TopKAggregation::new()), + // The FilterPushdown rule tries to push down filters as far as it can. + // For example, it will push down filtering from a `FilterExec` to + // a `DataSourceExec`, or from a `TopK`'s current state to a `DataSourceExec`. + Arc::new(PushdownFilter::new()), // The LimitPushdown rule tries to push limits down as far as possible, // replacing operators with fetching variants, or adding limits // past operators that support limit pushdown. diff --git a/datafusion/physical-optimizer/src/pruning.rs b/datafusion/physical-optimizer/src/pruning.rs index b5287f3d33f3..8a26f2c892b2 100644 --- a/datafusion/physical-optimizer/src/pruning.rs +++ b/datafusion/physical-optimizer/src/pruning.rs @@ -40,7 +40,9 @@ use datafusion_common::{ use datafusion_common::{Column, DFSchema}; use datafusion_expr_common::operator::Operator; use datafusion_physical_expr::utils::{collect_columns, Guarantee, LiteralGuarantee}; -use datafusion_physical_expr::{expressions as phys_expr, PhysicalExprRef}; +use datafusion_physical_expr::{ + expressions as phys_expr, snapshot_physical_expr, PhysicalExprRef, +}; use datafusion_physical_plan::{ColumnarValue, PhysicalExpr}; /// A source of runtime statistical information to [`PruningPredicate`]s. @@ -527,6 +529,9 @@ impl PruningPredicate { /// See the struct level documentation on [`PruningPredicate`] for more /// details. pub fn try_new(expr: Arc, schema: SchemaRef) -> Result { + // Get a (simpler) snapshot of the physical expr here to use with `PruningPredicate` + // which does not handle dynamic exprs in general + let expr = snapshot_physical_expr(expr)?; let unhandled_hook = Arc::new(ConstantUnhandledPredicateHook::default()) as _; // build predicate expression once diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 5244038b9ae2..10d0ebacf41d 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -32,9 +32,10 @@ use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; use datafusion_common::Result; use datafusion_execution::TaskContext; +use datafusion_physical_expr::PhysicalExpr; use crate::coalesce::{BatchCoalescer, CoalescerState}; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, FilterPushdownAllowed}; use futures::ready; use futures::stream::{Stream, StreamExt}; @@ -212,6 +213,16 @@ impl ExecutionPlan for CoalesceBatchesExec { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Equal } + + fn filter_pushdown_request( + &self, + filters: &[Arc], + ) -> Result> { + Ok(filters + .iter() + .map(|f| FilterPushdownAllowed::Allowed(Arc::clone(f))) + .collect()) + } } /// Stream for [`CoalesceBatchesExec`]. See [`CoalesceBatchesExec`] for more details. diff --git a/datafusion/physical-plan/src/dynamic_filters.rs b/datafusion/physical-plan/src/dynamic_filters.rs new file mode 100644 index 000000000000..e76502779416 --- /dev/null +++ b/datafusion/physical-plan/src/dynamic_filters.rs @@ -0,0 +1,370 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + any::Any, + fmt::Display, + hash::Hash, + sync::{Arc, RwLock}, +}; + +use datafusion_common::{ + tree_node::{Transformed, TransformedResult, TreeNode}, + Result, +}; +use datafusion_expr::ColumnarValue; +use datafusion_physical_expr::{utils::conjunction, PhysicalExpr}; +use datafusion_physical_expr_common::physical_expr::{DynEq, DynHash}; + +/// A source of dynamic runtime filters. +/// +/// During query execution, operators implementing this trait can provide +/// filter expressions that other operators can use to dynamically prune data. +/// +/// See `TopKDynamicFilterSource` in datafusion/physical-plan/src/topk/mod.rs for examples. +pub trait DynamicFilterSource: + Send + Sync + std::fmt::Debug + DynEq + DynHash + Display + 'static +{ + /// Take a snapshot of the current state of filtering, returning a non-dynamic PhysicalExpr. + /// This is used to e.g. serialize dynamic filters across the wire or to pass them into systems + /// that won't use the `PhysicalExpr` API (e.g. matching on the concrete types of the expressions like `PruningPredicate` does). + /// For example, it is expected that this returns a relatively simple expression such as `col1 > 5` for a TopK operator or + /// `col2 IN (1, 2, ... N)` for a HashJoin operator. + fn snapshot_current_filters(&self) -> Result>>; + + fn as_any(&self) -> &dyn Any; +} + +impl PartialEq for dyn DynamicFilterSource { + fn eq(&self, other: &Self) -> bool { + self.dyn_eq(other.as_any()) + } +} + +impl Eq for dyn DynamicFilterSource {} + +/// A wrapper around a [`DynamicFilterSource`] that allows it to be used as a physical expression. +/// This will call [`DynamicFilterSource::snapshot_current_filters`] to get the current filters for each call to +/// [`PhysicalExpr::evaluate`], [`PhysicalExpr::data_type`], and [`PhysicalExpr::nullable`]. +/// It also implements [`PhysicalExpr::snapshot`] by forwarding the call to [`DynamicFilterSource::snapshot_current_filters`]. +#[derive(Debug)] +pub struct DynamicFilterPhysicalExpr { + /// The original children of this PhysicalExpr, if any. + /// This is necessary because the dynamic filter may be initialized with a placeholder (e.g. `lit(true)`) + /// and later remapped to the actual expressions that are being filtered. + /// But we need to know the children (e.g. columns referenced in the expression) ahead of time to evaluate the expression correctly. + children: Vec>, + /// If any of the children were remapped / modified (e.g. to adjust for projections) we need to keep track of the new children + /// so that when we update `current()` in subsequent iterations we can re-apply the replacements. + remapped_children: Option>>, + /// The source of dynamic filters. + inner: Arc, + /// For testing purposes track the data type and nullability to make sure they don't change. + /// If they do, there's a bug in the implementation. + /// But this can have overhead in production, so it's only included in our tests. + data_type: Arc>>, + nullable: Arc>>, +} + +impl Hash for DynamicFilterPhysicalExpr { + fn hash(&self, state: &mut H) { + self.inner.dyn_hash(state); + self.children.dyn_hash(state); + self.remapped_children.dyn_hash(state); + } +} + +impl PartialEq for DynamicFilterPhysicalExpr { + fn eq(&self, other: &Self) -> bool { + self.inner.dyn_eq(other.inner.as_any()) + && self.children == other.children + && self.remapped_children == other.remapped_children + } +} + +impl Eq for DynamicFilterPhysicalExpr {} + +impl Display for DynamicFilterPhysicalExpr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DynamicFilterPhysicalExpr [ {} ]", self.inner) + } +} + +impl DynamicFilterPhysicalExpr { + pub fn new( + children: Vec>, + inner: Arc, + ) -> Self { + Self { + children, + remapped_children: None, // Initially no remapped children + inner, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } + + fn current(&self) -> Result> { + let current = conjunction(self.inner.snapshot_current_filters()?); + if let Some(remapped_children) = &self.remapped_children { + // Remap children to the current children + // of the expression. + current + .transform_up(|expr| { + // Check if this is any of our original children + if let Some(pos) = self + .children + .iter() + .position(|c| c.as_ref() == expr.as_ref()) + { + // If so, remap it to the current children + // of the expression. + let new_child = Arc::clone(&remapped_children[pos]); + Ok(Transformed::yes(new_child)) + } else { + // Otherwise, just return the expression + Ok(Transformed::no(expr)) + } + }) + .data() + } else { + Ok(current) + } + } +} + +impl PhysicalExpr for DynamicFilterPhysicalExpr { + fn as_any(&self) -> &dyn Any { + self + } + + fn children(&self) -> Vec<&Arc> { + self.remapped_children + .as_ref() + .unwrap_or(&self.children) + .iter() + .collect() + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(Self { + children: self.children.clone(), + remapped_children: Some(children), + inner: Arc::clone(&self.inner), + data_type: Arc::clone(&self.data_type), + nullable: Arc::clone(&self.nullable), + })) + } + + fn data_type( + &self, + input_schema: &arrow::datatypes::Schema, + ) -> Result { + let res = self.current()?.data_type(input_schema)?; + #[cfg(test)] + { + use datafusion_common::internal_err; + // Check if the data type has changed. + let mut data_type_lock = self + .data_type + .write() + .expect("Failed to acquire write lock for data_type"); + if let Some(existing) = &*data_type_lock { + if existing != &res { + // If the data type has changed, we have a bug. + return internal_err!( + "DynamicFilterPhysicalExpr data type has changed unexpectedly. \ + Expected: {existing:?}, Actual: {res:?}" + ); + } + } else { + *data_type_lock = Some(res.clone()); + } + } + Ok(res) + } + + fn nullable(&self, input_schema: &arrow::datatypes::Schema) -> Result { + let res = self.current()?.nullable(input_schema)?; + #[cfg(test)] + { + use datafusion_common::internal_err; + // Check if the nullability has changed. + let mut nullable_lock = self + .nullable + .write() + .expect("Failed to acquire write lock for nullable"); + if let Some(existing) = *nullable_lock { + if existing != res { + // If the nullability has changed, we have a bug. + return internal_err!( + "DynamicFilterPhysicalExpr nullability has changed unexpectedly. \ + Expected: {existing}, Actual: {res}" + ); + } + } else { + *nullable_lock = Some(res); + } + } + Ok(res) + } + + fn evaluate( + &self, + batch: &arrow::record_batch::RecordBatch, + ) -> Result { + let current = self.current()?; + #[cfg(test)] + { + // Ensure that we are not evaluating after the expression has changed. + let schema = batch.schema(); + self.nullable(&schema)?; + self.data_type(&schema)?; + }; + current.evaluate(batch) + } + + fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + if let Ok(inner) = self.inner.snapshot_current_filters() { + conjunction(inner).fmt_sql(f) + } else { + write!(f, "dynamic_filter_expr()") // What do we want to do here? + } + } + + fn snapshot(&self) -> Result>> { + // Return the current expression as a snapshot. + Ok(Some(self.current()?)) + } +} + +#[cfg(test)] +mod test { + use arrow::array::RecordBatch; + use datafusion_common::ScalarValue; + use datafusion_physical_expr::expressions::lit; + + use super::*; + + #[test] + fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { + #[derive(Debug)] + struct MockDynamicFilterSource { + current_expr: Arc>>, + } + + impl Hash for MockDynamicFilterSource { + fn hash(&self, state: &mut H) { + // Hash the current expression to ensure uniqueness + self.current_expr.read().unwrap().dyn_hash(state); + } + } + + impl DynEq for MockDynamicFilterSource { + fn dyn_eq(&self, other: &dyn Any) -> bool { + if let Some(other) = other.downcast_ref::() { + self.current_expr + .read() + .unwrap() + .eq(&other.current_expr.read().unwrap()) + } else { + false + } + } + } + + impl DynamicFilterSource for MockDynamicFilterSource { + fn snapshot_current_filters(&self) -> Result>> { + let expr = self.current_expr.read().unwrap().clone(); + Ok(vec![expr]) + } + + fn as_any(&self) -> &dyn Any { + self + } + } + + impl Display for MockDynamicFilterSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "MockDynamicFilterSource [ current_expr: {:?} ]", + self.current_expr.read().unwrap() + ) + } + } + + let source = Arc::new(MockDynamicFilterSource { + current_expr: Arc::new(RwLock::new(lit(42) as Arc)), + }); + let dynamic_filter = DynamicFilterPhysicalExpr::new( + vec![], + Arc::clone(&source) as Arc, + ); + + // First call to data_type and nullable should set the initial values. + let initial_data_type = dynamic_filter + .data_type(&arrow::datatypes::Schema::empty()) + .unwrap(); + let initial_nullable = dynamic_filter + .nullable(&arrow::datatypes::Schema::empty()) + .unwrap(); + + // Call again and expect no change. + let second_data_type = dynamic_filter + .data_type(&arrow::datatypes::Schema::empty()) + .unwrap(); + let second_nullable = dynamic_filter + .nullable(&arrow::datatypes::Schema::empty()) + .unwrap(); + assert_eq!( + initial_data_type, second_data_type, + "Data type should not change on second call." + ); + assert_eq!( + initial_nullable, second_nullable, + "Nullability should not change on second call." + ); + + // Now change the current expression to something else. + { + let mut current = source.current_expr.write().unwrap(); + *current = lit(ScalarValue::Utf8(None)) as Arc; + } + // Check that we error if we call data_type, nullable or evaluate after changing the expression. + assert!( + dynamic_filter + .data_type(&arrow::datatypes::Schema::empty()) + .is_err(), + "Expected err when data_type is called after changing the expression." + ); + assert!( + dynamic_filter + .nullable(&arrow::datatypes::Schema::empty()) + .is_err(), + "Expected err when nullable is called after changing the expression." + ); + let batch = RecordBatch::new_empty(Arc::new(arrow::datatypes::Schema::empty())); + assert!( + dynamic_filter.evaluate(&batch).is_err(), + "Expected err when evaluate is called after changing the expression." + ); + } +} diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 2bc5706ee0e1..595e1afd32c6 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -467,8 +467,106 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { ) -> Result>> { Ok(None) } + + /// Returns a set of filters that this operator owns but would like to be pushed down. + /// For example, a `TopK` operator may produce dynamic filters that reference it's currrent state, + /// while a `FilterExec` will just hand of the filters it has as is. + /// The default implementation returns an empty vector. + /// These filters are applied row-by row and any that return `false` or `NULL` will be + /// filtered out and any that return `true` will be kept. + /// The expressions returned **must** always return `true` or `false`; + /// other truthy or falsy values are not allowed (e.g. `0`, `1`). + /// + /// # Returns + /// A vector of filters that this operator would like to push down. + /// These should be treated as the split conjunction of a `WHERE` clause. + /// That is, a query such as `WHERE a = 1 AND b = 2` would return two + /// filters: `a = 1` and `b = 2`. + /// They can always be assembled into a single filter using + /// [`crate::physical_expr::split_conjunction`]. + fn filters_for_pushdown(&self) -> Result>> { + Ok(Vec::new()) + } + + /// Checks which filters this node allows to be pushed down through it from a parent to a child. + /// For example, a `ProjectionExec` node can allow filters that only refernece + /// columns it did not create through but filters that reference columns it is creating cannot be pushed down any further. + /// That is, it only allows some filters through because it changes the schema of the data. + /// Aggregation nodes may not allow any filters to be pushed down as they change the cardinality of the data. + /// RepartitionExec nodes allow all filters to be pushed down as they don't change the schema or cardinality. + fn filter_pushdown_request( + &self, + filters: &[Arc], + ) -> Result> { + Ok(vec![FilterPushdownAllowed::Disallowed; filters.len()]) + } + + /// After we've attempted to push down filters into this node's children + /// this will be called with the result for each filter that this node gave in `filters_for_pushdown` + /// **and** any filters that children could not handle. + fn with_filter_pushdown_result( + self: Arc, + _own_filters_result: &[FilterSupport], + _parent_filters_remaining: &[Arc], + ) -> Result> { + Ok(None) + } +} + +/// The answer to the question: "Can this filter be pushed down through this plan?" +/// Note that this is different from [`FilterSupport`] which is the answer to "Can *this* plan handle this filter?" +#[derive(Debug, Clone)] +pub enum FilterPushdownAllowed { + /// The operator allows this filter to be pushed down to its children. + /// The operator may choose to return a *different* filter expression + /// that is equivalent to the original filter, e.g. to deal with column indexes in a projection + /// or because the original filter can't be pushed down as is but a less-selective filter can be. + Allowed(Arc), + /// The operator does not allow this filter to be pushed down to its children. + Disallowed, } +/// The answer to the question: "Can this operator handle this filter itself?" +/// Note that this is different from [`FilterPushdownAllowed`] which is the answer to "Can *this* plan handle this filter?" +#[derive(Debug, Clone, Copy)] +pub enum FilterSupport { + /// Filter may not have been pushed down to the child plan, or the child plan + /// can only partially apply the filter but may have false positives (but not false negatives). + /// In this case the parent **must** behave as if the filter was not pushed down + /// and must apply the filter itself. + Unhandled, + /// Filter was pushed down to the child plan and the child plan promises that + /// it will apply the filter correctly with no false positives or false negatives. + /// The parent can safely drop the filter. + HandledExact, +} + +/// The combined result of a filter pushdown operation. +/// This includes: +/// * The inner plan that was produced by the pushdown operation. +/// * The support for each filter that was pushed down. +pub struct FilterPushdownResult { + pub inner: T, + pub support: Vec, +} + +impl FilterPushdownResult { + pub fn new(plan: T, support: Vec) -> Self { + Self { + inner: plan, + support, + } + } + + pub fn is_exact(&self) -> bool { + self.support + .iter() + .all(|s| matches!(s, FilterSupport::HandledExact)) + } +} + +pub type ExecutionPlanFilterPushdownResult = FilterPushdownResult>; + /// [`ExecutionPlan`] Invariant Level /// /// What set of assertions ([Invariant]s) holds for a particular `ExecutionPlan` diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index a8a9973ea043..f15849f95d9f 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -25,7 +25,10 @@ use super::{ RecordBatchStream, SendableRecordBatchStream, Statistics, }; use crate::common::can_project; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{ + CardinalityEffect, ExecutionPlanFilterPushdownResult, FilterPushdownAllowed, + FilterSupport, +}; use crate::projection::{ make_with_child, try_embed_projection, update_expr, EmbeddedProjection, ProjectionExec, @@ -34,6 +37,7 @@ use crate::{ metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}, DisplayFormatType, ExecutionPlan, }; +use datafusion_physical_expr::expressions::lit; use arrow::compute::filter_record_batch; use arrow::datatypes::{DataType, SchemaRef}; @@ -48,10 +52,10 @@ use datafusion_expr::Operator; use datafusion_physical_expr::equivalence::ProjectionMapping; use datafusion_physical_expr::expressions::BinaryExpr; use datafusion_physical_expr::intervals::utils::check_support; -use datafusion_physical_expr::utils::collect_columns; +use datafusion_physical_expr::utils::{collect_columns, reassign_predicate_columns}; use datafusion_physical_expr::{ - analyze, split_conjunction, AcrossPartitions, AnalysisContext, ConstExpr, - ExprBoundaries, PhysicalExpr, + analyze, conjunction, split_conjunction, AcrossPartitions, AnalysisContext, + ConstExpr, ExprBoundaries, PhysicalExpr, }; use datafusion_physical_expr_common::physical_expr::fmt_sql; @@ -433,6 +437,83 @@ impl ExecutionPlan for FilterExec { } try_embed_projection(projection, self) } + + fn filter_pushdown_request( + &self, + filters: &[Arc], + ) -> Result> { + let filters = if self.projection.is_some() { + let input_schema = self.input.schema(); + filters + .iter() + .map(|f| reassign_predicate_columns(Arc::clone(f), &input_schema, false)) + .collect::>>()? + } else { + filters.to_vec() + }; + + Ok(filters + .into_iter() + .map(FilterPushdownAllowed::Allowed) + .collect()) + } + + fn filters_for_pushdown(&self) -> Result>> { + let predicate = reassign_predicate_columns( + Arc::clone(&self.predicate), + &self.input.schema(), + false, + )?; + Ok(vec![predicate]) + } + + fn with_filter_pushdown_result( + self: Arc, + own_filters_result: &[FilterSupport], + parent_filters_remaining: &[Arc], + ) -> Result> { + // Only keep filters who's index maps to the pushdown result Unsupported + let filters_for_pushdown = self.filters_for_pushdown()?; + let new_filters = filters_for_pushdown + .iter() + .zip(own_filters_result.iter()) + .filter_map(|(f, p)| { + if matches!(p, FilterSupport::HandledExact) { + // Exact pushdown support means we keep discard filter + None + } else { + // Otherwise we still have to apply it + Some(Arc::clone(f)) + } + }) + // Combine that with any leftover filters from parents that our children couldn't handle + .chain(parent_filters_remaining.iter().map(Arc::clone)); + + let new_predicate = conjunction(new_filters); + + if new_predicate.eq(&lit(true)) && self.projection.is_none() { + // We can remove ourselves from the execution tree + Ok(Some(ExecutionPlanFilterPushdownResult::new( + Arc::clone(&self.input), + vec![FilterSupport::HandledExact; parent_filters_remaining.len()], + ))) + } else { + Ok(Some(ExecutionPlanFilterPushdownResult { + inner: Arc::new(Self { + predicate: new_predicate, + input: Arc::clone(&self.input), + metrics: self.metrics.clone(), + default_selectivity: self.default_selectivity, + cache: self.cache.clone(), + projection: self.projection.clone(), + }), + support: vec![ + FilterSupport::HandledExact; + parent_filters_remaining.len() + ], + })) + } + } } impl EmbeddedProjection for FilterExec { diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 04fbd06fabcd..44c78c49f4ac 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -36,7 +36,7 @@ pub use datafusion_expr::{Accumulator, ColumnarValue}; pub use datafusion_physical_expr::window::WindowExpr; use datafusion_physical_expr::PhysicalSortExpr; pub use datafusion_physical_expr::{ - expressions, Distribution, Partitioning, PhysicalExpr, + expressions, snapshot_physical_expr, Distribution, Partitioning, PhysicalExpr, }; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; @@ -51,6 +51,7 @@ pub use crate::stream::EmptyRecordBatchStream; pub use crate::topk::TopK; pub use crate::visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; +mod dynamic_filters; mod ordering; mod render_tree; mod topk; diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index ebc751201378..bf070a8c9348 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -29,7 +29,7 @@ use super::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet}; use super::{ DisplayAs, ExecutionPlanProperties, RecordBatchStream, SendableRecordBatchStream, }; -use crate::execution_plan::CardinalityEffect; +use crate::execution_plan::{CardinalityEffect, FilterPushdownAllowed}; use crate::hash_utils::create_hashes; use crate::metrics::BaselineMetrics; use crate::projection::{all_columns, make_with_child, update_expr, ProjectionExec}; @@ -723,6 +723,16 @@ impl ExecutionPlan for RepartitionExec { new_partitioning, )?))) } + + fn filter_pushdown_request( + &self, + filters: &[Arc], + ) -> Result> { + Ok(filters + .iter() + .map(|f| FilterPushdownAllowed::Allowed(Arc::clone(f))) + .collect()) + } } impl RepartitionExec { diff --git a/datafusion/physical-plan/src/sorts/mod.rs b/datafusion/physical-plan/src/sorts/mod.rs index c7ffae4061c0..29eb3a2647f4 100644 --- a/datafusion/physical-plan/src/sorts/mod.rs +++ b/datafusion/physical-plan/src/sorts/mod.rs @@ -22,6 +22,7 @@ mod cursor; mod merge; pub mod partial_sort; pub mod sort; +pub mod sort_filters; pub mod sort_preserving_merge; mod stream; pub mod streaming_merge; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 1072e9abf437..d55b3a730720 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -25,7 +25,9 @@ use std::fmt::{Debug, Formatter}; use std::sync::Arc; use crate::common::spawn_buffered; -use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType}; +use crate::execution_plan::{ + Boundedness, CardinalityEffect, EmissionType, FilterPushdownAllowed, +}; use crate::expressions::PhysicalSortExpr; use crate::limit::LimitStream; use crate::metrics::{ @@ -57,12 +59,14 @@ use datafusion_execution::disk_manager::RefCountedTempFile; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_execution::TaskContext; -use datafusion_physical_expr::LexOrdering; +use datafusion_physical_expr::{LexOrdering, PhysicalExpr}; use datafusion_physical_expr_common::sort_expr::LexRequirement; use futures::{StreamExt, TryStreamExt}; use log::{debug, trace}; +use super::sort_filters::SortDynamicFilterSource; + struct ExternalSorterMetrics { /// metrics baseline: BaselineMetrics, @@ -968,6 +972,8 @@ pub struct SortExec { fetch: Option, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Dynamic filter sources + dynamic_filter_source: Arc, } impl SortExec { @@ -976,6 +982,7 @@ impl SortExec { pub fn new(expr: LexOrdering, input: Arc) -> Self { let preserve_partitioning = false; let cache = Self::compute_properties(&input, expr.clone(), preserve_partitioning); + let dynamic_filter_source = Arc::new(SortDynamicFilterSource::new(expr.clone())); Self { expr, input, @@ -983,6 +990,7 @@ impl SortExec { preserve_partitioning, fetch: None, cache, + dynamic_filter_source, } } @@ -1035,6 +1043,7 @@ impl SortExec { preserve_partitioning: self.preserve_partitioning, fetch, cache, + dynamic_filter_source: Arc::clone(&self.dynamic_filter_source), } } @@ -1121,6 +1130,14 @@ impl SortExec { boundedness, ) } + + fn with_dynamic_filter_source( + mut self, + dynamic_filter_source: Arc, + ) -> Self { + self.dynamic_filter_source = dynamic_filter_source; + self + } } impl DisplayAs for SortExec { @@ -1185,7 +1202,8 @@ impl ExecutionPlan for SortExec { ) -> Result> { let new_sort = SortExec::new(self.expr.clone(), Arc::clone(&children[0])) .with_fetch(self.fetch) - .with_preserve_partitioning(self.preserve_partitioning); + .with_preserve_partitioning(self.preserve_partitioning) + .with_dynamic_filter_source(Arc::clone(&self.dynamic_filter_source)); Ok(Arc::new(new_sort)) } @@ -1226,12 +1244,23 @@ impl ExecutionPlan for SortExec { context.runtime_env(), &self.metrics_set, )?; + let dynamic_filter_source = Arc::clone(&self.dynamic_filter_source); + let enable_dynamic_filter_pushdown = context + .session_config() + .options() + .optimizer + .enable_dynamic_filter_pushdown; Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), futures::stream::once(async move { while let Some(batch) = input.next().await { let batch = batch?; topk.insert_batch(batch)?; + if enable_dynamic_filter_pushdown { + if let Some(values) = topk.get_threshold_values()? { + dynamic_filter_source.update_values(&values)?; + } + } } topk.emit() }) @@ -1316,9 +1345,24 @@ impl ExecutionPlan for SortExec { Ok(Some(Arc::new( SortExec::new(updated_exprs, make_with_child(projection, self.input())?) .with_fetch(self.fetch()) - .with_preserve_partitioning(self.preserve_partitioning()), + .with_preserve_partitioning(self.preserve_partitioning()) + .with_dynamic_filter_source(Arc::clone(&self.dynamic_filter_source)), ))) } + + fn filters_for_pushdown(&self) -> Result>> { + Ok(vec![self.dynamic_filter_source.as_physical_expr()?]) + } + + fn filter_pushdown_request( + &self, + filters: &[Arc], + ) -> Result> { + Ok(filters + .iter() + .map(|f| FilterPushdownAllowed::Allowed(Arc::clone(f))) + .collect()) + } } #[cfg(test)] diff --git a/datafusion/physical-plan/src/sorts/sort_filters.rs b/datafusion/physical-plan/src/sorts/sort_filters.rs new file mode 100644 index 000000000000..3e37cd6b3cbd --- /dev/null +++ b/datafusion/physical-plan/src/sorts/sort_filters.rs @@ -0,0 +1,297 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{ + fmt::Display, + hash::{Hash, Hasher}, + sync::{Arc, RwLock}, +}; + +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::Operator; +use datafusion_physical_expr::{ + expressions::{is_not_null, is_null, lit, BinaryExpr}, + LexOrdering, PhysicalExpr, +}; + +use crate::dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSource}; + +/// Pushdown of dynamic filters from sort + limit operators (aka `TopK`) is used to speed up queries +/// such as `SELECT * FROM table ORDER BY col DESC LIMIT 10` by pushing down the +/// threshold values for the sort columns to the data source. +/// That is, the TopK operator will keep track of the top 10 values for the sort +/// and before a new file is opened its statistics will be checked against the +/// threshold values to determine if the file can be skipped and predicate pushdown +/// will use these to skip rows during the scan. +/// +/// For example, imagine this data gets created if multiple sources with clock skews, +/// network delays, etc. are writing data and you don't do anything fancy to guarantee +/// perfect sorting by `timestamp` (i.e. you naively write out the data to Parquet, maybe do some compaction, etc.). +/// The point is that 99% of yesterday's files have a `timestamp` smaller than 99% of today's files +/// but there may be a couple seconds of overlap between files. +/// To be concrete, let's say this is our data: +// +// | file | min | max | +// |------|-----|-----| +// | 1 | 1 | 10 | +// | 2 | 9 | 19 | +// | 3 | 20 | 31 | +// | 4 | 30 | 35 | +// +// Ideally a [`TableProvider`] is able to use file level stats or other methods to roughly order the files +// within each partition / file group such that we start with the newest / largest `timestamp`s. +// If this is not possible the optimization still works but is less efficient and harder to visualize, +// so for this example let's assume that we process 1 file at a time and we started with file 4. +// After processing file 4 let's say we have 10 values in our TopK heap, the smallest of which is 30. +// The TopK operator will then push down the filter `timestamp < 30` down the tree of [`ExecutionPlan`]s +// and if the data source supports dynamic filter pushdown it will accept a reference to this [`DynamicPhysicalExprSource`] +// and when it goes to open file 3 it will ask the [`DynamicPhysicalExprSource`] for the current filters. +// Since file 3 may contain values larger than 30 we cannot skip it entirely, +// but scanning it may still be more efficient due to page pruning and other optimizations. +// Once we get to file 2 however we can skip it entirely because we know that all values in file 2 are smaller than 30. +// The same goes for file 1. +// So this optimization just saved us 50% of the work of scanning the data. +#[derive(Debug)] +pub struct SortDynamicFilterSource { + /// Sort expressions + expr: LexOrdering, + /// Current threshold values + thresholds: Arc>>>, +} + +impl Hash for SortDynamicFilterSource { + fn hash(&self, state: &mut H) { + // Hash the pointers to the thresholds + let thresholds = Arc::as_ptr(&self.thresholds) as usize; + thresholds.hash(state); + } +} + +impl PartialEq for SortDynamicFilterSource { + fn eq(&self, other: &Self) -> bool { + Arc::ptr_eq(&self.thresholds, &other.thresholds) + } +} + +impl Eq for SortDynamicFilterSource {} + +impl SortDynamicFilterSource { + pub fn new(expr: LexOrdering) -> Self { + let thresholds = Arc::new(RwLock::new(vec![None; expr.len()])); + Self { expr, thresholds } + } + + pub fn update_values(&self, new_values: &[ScalarValue]) -> Result<()> { + let replace = { + let thresholds = self.thresholds.read().map_err(|_| { + datafusion_common::DataFusionError::Execution( + "Failed to acquire write lock on thresholds".to_string(), + ) + })?; + if new_values.len() != thresholds.len() { + return Err(datafusion_common::DataFusionError::Execution( + "The number of new values does not match the number of thresholds" + .to_string(), + )); + } + // We need to decide if these values replace our current values or not. + // They only replace our current values if they would sort before them given our sorting expression. + // Importantly, since this may be a multi-expressions sort, we need to check that **the entire expression** + // sorts before the current set of values, not just one column. + // This means that if we have a sort expression like `a, b` and the new value is `a = 1, b = 2` + // and the current value is `a = 1, b = 3` we need to check that `a = 1, b = 2` sorts before `a = 1, b = 3` + // and not just that `a = 1` sorts before `a = 1`. + // We also have to handle ASC/DESC and NULLS FIRST/LAST for each column. + let mut replace = true; + for (i, new_value) in new_values.iter().enumerate() { + let current_value = &thresholds[i]; + let sort_expr = &self.expr[i]; + let descending = sort_expr.options.descending; + let nulls_first = sort_expr.options.nulls_first; + if let Some(current_value) = current_value { + let new_value_is_greater_than_current = new_value.gt(current_value); + let new_value_is_null = new_value.is_null(); + let current_value_is_null = current_value.is_null(); + // Handle the null cases + if current_value_is_null && !new_value_is_null && nulls_first { + replace = false; + break; + } + if new_value_is_null && !current_value_is_null && !nulls_first { + replace = false; + break; + } + // Handle the descending case + if descending { + if new_value_is_greater_than_current { + replace = false; + break; + } + } else if !new_value_is_greater_than_current { + replace = false; + break; + } + // Handle the equality case + if new_value.eq(current_value) { + replace = false; + break; + } + } + } + replace + }; + if replace { + let mut thresholds = self.thresholds.write().map_err(|_| { + datafusion_common::DataFusionError::Execution( + "Failed to acquire write lock on thresholds".to_string(), + ) + })?; + for (i, new_value) in new_values.iter().enumerate() { + thresholds[i] = Some(new_value.clone()); + } + } + Ok(()) + } + + pub fn as_physical_expr(self: &Arc) -> Result> { + let children = self + .expr + .iter() + .map(|sort_expr| Arc::clone(&sort_expr.expr)) + .collect::>(); + Ok(Arc::new(DynamicFilterPhysicalExpr::new( + children, + Arc::clone(self) as Arc, + ))) + } +} + +impl Display for SortDynamicFilterSource { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let thresholds = self + .snapshot_current_filters() + .map_err(|_| std::fmt::Error)? + .iter() + .map(|p| format!("{p}")) + .collect::>(); + let inner = thresholds.join(","); + write!(f, "SortDynamicFilterSource[ {} ]", inner,) + } +} + +impl DynamicFilterSource for SortDynamicFilterSource { + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn snapshot_current_filters(&self) -> Result>> { + let thresholds = self.thresholds.read().map_err(|_| { + datafusion_common::DataFusionError::Execution( + "Failed to acquire read lock on thresholds".to_string(), + ) + })?; + // Create filter expressions for each threshold + let mut filters: Vec> = + Vec::with_capacity(thresholds.len()); + + let mut prev_sort_expr: Option> = None; + for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) { + let Some(value) = value else { + // If the value is None, we cannot create a filter for this threshold + // This means we skip this column for filtering + continue; + }; + + // Create the appropriate operator based on sort order + let op = if sort_expr.options.descending { + // For descending sort, we want col > threshold (exclude smaller values) + Operator::Gt + } else { + // For ascending sort, we want col < threshold (exclude larger values) + Operator::Lt + }; + + let value_null = value.is_null(); + + let comparison = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + op, + lit(value.clone()), + )); + + let comparison_with_null = match (sort_expr.options.nulls_first, value_null) { + // For nulls first, transform to (threshold.value is not null) and (threshold.expr is null or comparison) + (true, true) => lit(false), + (true, false) => Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + comparison, + )), + // For nulls last, transform to (threshold.value is null and threshold.expr is not null) + // or (threshold.value is not null and comparison) + (false, true) => is_not_null(Arc::clone(&sort_expr.expr))?, + (false, false) => comparison, + }; + + let mut eq_expr = Arc::new(BinaryExpr::new( + Arc::clone(&sort_expr.expr), + Operator::Eq, + lit(value.clone()), + )); + + if value_null { + eq_expr = Arc::new(BinaryExpr::new( + is_null(Arc::clone(&sort_expr.expr))?, + Operator::Or, + eq_expr, + )); + } + + // For a query like order by a, b, the filter for column `b` is only applied if + // the condition a = threshold.value (considering null equality) is met. + // Therefore, we add equality predicates for all preceding fields to the filter logic of the current field, + // and include the current field's equality predicate in `prev_sort_expr` for use with subsequent fields. + match prev_sort_expr.take() { + None => { + prev_sort_expr = Some(eq_expr); + filters.push(comparison_with_null); + } + Some(p) => { + filters.push(Arc::new(BinaryExpr::new( + Arc::clone(&p), + Operator::And, + comparison_with_null, + ))); + + prev_sort_expr = + Some(Arc::new(BinaryExpr::new(p, Operator::And, eq_expr))); + } + } + } + + let dynamic_predicate = filters + .into_iter() + .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b))); + + if let Some(predicate) = dynamic_predicate { + if !predicate.eq(&lit(true)) { + return Ok(vec![predicate]); + } + } + Ok(vec![]) + } +} diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 85de1eefce2e..89bbfd2250e1 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -17,27 +17,30 @@ //! TopK: Combination of Sort / LIMIT -use arrow::{ - compute::interleave, - row::{RowConverter, Rows, SortField}, -}; use std::mem::size_of; -use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; +use std::sync::Arc; +use std::{cmp::Ordering, collections::BinaryHeap}; -use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; -use crate::spill::get_record_batch_memory_size; -use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; -use datafusion_common::HashMap; -use datafusion_common::Result; +use arrow::{ + compute::interleave, + row::{RowConverter, Rows, SortField}, +}; +use datafusion_common::{internal_err, HashMap}; +use datafusion_common::{Result, ScalarValue}; use datafusion_execution::{ memory_pool::{MemoryConsumer, MemoryReservation}, runtime_env::RuntimeEnv, }; +use datafusion_expr::ColumnarValue; use datafusion_physical_expr::PhysicalSortExpr; use datafusion_physical_expr_common::sort_expr::LexOrdering; +use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; +use crate::spill::get_record_batch_memory_size; +use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; + /// Global TopK /// /// # Background @@ -92,6 +95,16 @@ pub struct TopK { heap: TopKHeap, } +impl std::fmt::Debug for TopK { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TopK") + .field("schema", &self.schema) + .field("batch_size", &self.batch_size) + .field("expr", &self.expr) + .finish() + } +} + impl TopK { /// Create a new [`TopK`] that stores the top `k` values, as /// defined by the sort expressions in `expr`. @@ -186,6 +199,10 @@ impl TopK { Ok(()) } + pub fn get_threshold_values(&self) -> Result>> { + self.heap.get_threshold_values(&self.expr) + } + /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap pub fn emit(self) -> Result { let Self { @@ -271,7 +288,7 @@ struct TopKHeap { } impl TopKHeap { - fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self { + pub fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self { assert!(k > 0); Self { k, @@ -282,6 +299,50 @@ impl TopKHeap { } } + /// Get threshold values for all columns in the given sort expressions. + /// If the heap does not yet have k items, returns None. + /// Otherwise, returns the threshold values from the max row in the heap. + pub fn get_threshold_values( + &self, + sort_exprs: &[PhysicalSortExpr], + ) -> Result>> { + // If the heap doesn't have k elements yet, we can't create thresholds + let max_row = match self.max() { + Some(row) => row, + None => return Ok(None), + }; + + // Get the batch that contains the max row + let batch_entry = match self.store.get(max_row.batch_id) { + Some(entry) => entry, + None => return internal_err!("Invalid batch ID in TopKRow"), + }; + + // Extract threshold values for each sort expression + let mut scalar_values = Vec::with_capacity(sort_exprs.len()); + for sort_expr in sort_exprs { + // Extract the value for this column from the max row + let expr = Arc::clone(&sort_expr.expr); + let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 1))?; + + // Convert to scalar value - should be a single value since we're evaluating on a single row batch + let scalar = match value { + ColumnarValue::Scalar(scalar) => scalar, + ColumnarValue::Array(array) if array.len() == 1 => { + // Extract the first (and only) value from the array + ScalarValue::try_from_array(&array, 0)? + } + array => { + return internal_err!("Expected a scalar value, got {:?}", array) + } + }; + + scalar_values.push(scalar); + } + + Ok(Some(scalar_values)) + } + /// Register a [`RecordBatch`] with the heap, returning the /// appropriate entry pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry { @@ -297,7 +358,7 @@ impl TopKHeap { /// Returns the largest value stored by the heap if there are k /// items, otherwise returns None. Remember this structure is /// keeping the "smallest" k values - fn max(&self) -> Option<&TopKRow> { + pub fn max(&self) -> Option<&TopKRow> { if self.inner.len() < self.k { None } else { @@ -509,7 +570,7 @@ impl TopKRow { } /// Returns a slice to the owned row value - fn row(&self) -> &[u8] { + pub fn row(&self) -> &[u8] { self.row.as_slice() } } @@ -529,7 +590,7 @@ impl Ord for TopKRow { } #[derive(Debug)] -struct RecordBatchEntry { +pub struct RecordBatchEntry { id: u32, batch: RecordBatch, // for this batch, how many times has it been used @@ -648,6 +709,7 @@ impl RecordBatchStore { mod tests { use super::*; use arrow::array::{Float64Array, Int32Array, RecordBatch}; + use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, Schema}; /// This test ensures the size calculation is correct for RecordBatches with multiple columns. @@ -681,4 +743,63 @@ mod tests { record_batch_store.unuse(0); assert_eq!(record_batch_store.batches_size, 0); } + + #[test] + fn test_topk_as_dynamic_filter_source() { + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Int32, true), + Field::new("col2", DataType::Float64, false), + ])); + + let runtime = Arc::new(RuntimeEnv::default()); + let metrics = ExecutionPlanMetricsSet::new(); + + // Create a TopK with descending sort on col2 + let sort_expr = vec![PhysicalSortExpr { + expr: Arc::new(datafusion_physical_expr::expressions::Column::new( + "col2", 1, + )), + options: SortOptions { + descending: true, + nulls_first: false, + }, + }]; + + let mut topk = TopK::try_new( + 0, + Arc::clone(&schema), + sort_expr.into(), + 5, // k=5 + 100, // batch_size + runtime, + &metrics, + ) + .unwrap(); + + // Initially there should be no filters (empty heap) + let filter = topk.get_threshold_values().unwrap(); + assert_eq!(filter, None, "Expected no filters when heap is empty"); + + // Insert some data to fill the heap + let col1 = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); + let col2 = + Float64Array::from(vec![10.0, 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0]); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(col1), Arc::new(col2)], + ) + .unwrap(); + + // Insert the data into TopK + topk.insert_batch(batch).unwrap(); + + // Now there should be a filter + let filter = topk.get_threshold_values().unwrap().unwrap(); + assert_eq!(filter.len(), 1, "Expected one filter after inserting data"); + assert_eq!( + filter[0], + ScalarValue::Float64(Some(6.0)), + "Expected filter value to be 6.0" + ); + } } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index c196595eeed4..a2945c4abe32 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -28,7 +28,9 @@ use datafusion::physical_plan::expressions::{ }; use datafusion::physical_plan::udaf::AggregateFunctionExpr; use datafusion::physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr}; -use datafusion::physical_plan::{Partitioning, PhysicalExpr, WindowExpr}; +use datafusion::physical_plan::{ + snapshot_physical_expr, Partitioning, PhysicalExpr, WindowExpr, +}; use datafusion::{ datasource::{ file_format::{csv::CsvSink, json::JsonSink}, @@ -210,6 +212,7 @@ pub fn serialize_physical_expr( value: &Arc, codec: &dyn PhysicalExtensionCodec, ) -> Result { + let value = snapshot_physical_expr(Arc::clone(value))?; let expr = value.as_any(); if let Some(expr) = expr.downcast_ref::() { @@ -368,7 +371,7 @@ pub fn serialize_physical_expr( }) } else { let mut buf: Vec = vec![]; - match codec.try_encode_expr(value, &mut buf) { + match codec.try_encode_expr(&value, &mut buf) { Ok(_) => { let inputs: Vec = value .children() diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 68e21183938b..ea18318dd699 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -99,6 +99,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.enable_distinct_aggregation_soft_limit | true | When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. | | datafusion.optimizer.enable_round_robin_repartition | true | When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores | | datafusion.optimizer.enable_topk_aggregation | true | When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible | +| datafusion.optimizer.enable_dynamic_filter_pushdown | true | When set to true attempts to push down dynamic filters generated by operators into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. | | datafusion.optimizer.filter_null_join_keys | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. | | datafusion.optimizer.repartition_aggregations | true | Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel using the provided `target_partitions` level | | datafusion.optimizer.repartition_file_min_size | 10485760 | Minimum total files size in bytes to perform file scan repartitioning. |