diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 5140ae7eb0dc..c508aa6c6410 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -1476,6 +1476,7 @@ dependencies = [ "chrono", "datafusion-common", "datafusion-expr", + "datafusion-functions-aggregate", "datafusion-physical-expr", "hashbrown 0.14.5", "indexmap", @@ -1554,6 +1555,7 @@ dependencies = [ "datafusion-common-runtime", "datafusion-execution", "datafusion-expr", + "datafusion-functions-aggregate", "datafusion-functions-aggregate-common", "datafusion-functions-window-common", "datafusion-physical-expr", diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 1ad10d164868..21bb8058c99f 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -636,6 +636,10 @@ config_namespace! { /// then the output will be coerced to a non-view. /// Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. pub expand_views_at_output: bool, default = false + + /// when set to true, datafusion would try to push the build side statistic + /// to probe phase + pub dynamic_join_pushdown: bool, default = true } } diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs index 9d78a0f2e3f8..efd744591c7b 100644 --- a/datafusion/core/src/datasource/physical_plan/file_stream.rs +++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs @@ -21,11 +21,6 @@ //! Note: Most traits here need to be marked `Sync + Send` to be //! compliant with the `SendableRecordBatchStream` trait. -use std::collections::VecDeque; -use std::mem; -use std::pin::Pin; -use std::task::{Context, Poll}; - use crate::datasource::listing::PartitionedFile; use crate::datasource::physical_plan::file_scan_config::PartitionColumnProjector; use crate::datasource::physical_plan::{FileMeta, FileScanConfig}; @@ -34,6 +29,11 @@ use crate::physical_plan::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time, }; use crate::physical_plan::RecordBatchStream; +use std::collections::VecDeque; +use std::mem; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; use arrow::datatypes::SchemaRef; use arrow::error::ArrowError; @@ -41,6 +41,7 @@ use arrow::record_batch::RecordBatch; use datafusion_common::instant::Instant; use datafusion_common::ScalarValue; +use datafusion_physical_plan::joins::DynamicFilterInfo; use futures::future::BoxFuture; use futures::stream::BoxStream; use futures::{ready, FutureExt, Stream, StreamExt}; @@ -95,6 +96,8 @@ pub struct FileStream { baseline_metrics: BaselineMetrics, /// Describes the behavior of the `FileStream` if file opening or scanning fails on_error: OnError, + /// dynamic filters + dynamic_filters: Option>, } /// Represents the state of the next `FileOpenFuture`. Since we need to poll @@ -272,6 +275,7 @@ impl FileStream { file_stream_metrics: FileStreamMetrics::new(metrics, partition), baseline_metrics: BaselineMetrics::new(metrics, partition), on_error: OnError::Fail, + dynamic_filters: None, }) } @@ -283,6 +287,14 @@ impl FileStream { self.on_error = on_error; self } + /// with dynamic filters + pub fn with_dynamic_filter( + mut self, + dynamic_filter: Option>, + ) -> Self { + self.dynamic_filters = dynamic_filter; + self + } /// Begin opening the next file in parallel while decoding the current file in FileStream. /// @@ -390,7 +402,11 @@ impl FileStream { } } match ready!(reader.poll_next_unpin(cx)) { - Some(Ok(batch)) => { + Some(Ok(mut batch)) => { + // if there is a ready dynamic filter, we just use it to filter + if let Some(dynamic_filters) = &self.dynamic_filters { + batch = dynamic_filters.filter_batch(&batch)? + } self.file_stream_metrics.time_scanning_until_data.stop(); self.file_stream_metrics.time_scanning_total.stop(); let result = self diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs index 980f796a53b2..dd2c6cd8a5d1 100644 --- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs +++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs @@ -42,6 +42,8 @@ use crate::{ use arrow::datatypes::SchemaRef; use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalExpr}; +use datafusion_physical_plan::joins::DynamicFilterInfo; +use datafusion_physical_plan::Metric; use itertools::Itertools; use log::debug; @@ -282,6 +284,8 @@ pub struct ParquetExec { table_parquet_options: TableParquetOptions, /// Optional user defined schema adapter schema_adapter_factory: Option>, + /// dynamic filters (like join filters) + dynamic_filters: Option>, } impl From for ParquetExecBuilder { @@ -291,7 +295,6 @@ impl From for ParquetExecBuilder { } /// [`ParquetExecBuilder`], builder for [`ParquetExec`]. -/// /// See example on [`ParquetExec`]. pub struct ParquetExecBuilder { file_scan_config: FileScanConfig, @@ -463,6 +466,7 @@ impl ParquetExecBuilder { cache, table_parquet_options, schema_adapter_factory, + dynamic_filters: None, } } } @@ -515,6 +519,7 @@ impl ParquetExec { cache: _, table_parquet_options, schema_adapter_factory, + .. } = self; ParquetExecBuilder { file_scan_config: base_config, @@ -711,10 +716,9 @@ impl DisplayAs for ParquetExec { ) }) .unwrap_or_default(); - write!(f, "ParquetExec: ")?; self.base_config.fmt_as(t, f)?; - write!(f, "{}{}", predicate_string, pruning_predicate_string,) + write!(f, "{}{}", predicate_string, pruning_predicate_string) } } } @@ -798,7 +802,16 @@ impl ExecutionPlan for ParquetExec { .schema_adapter_factory .clone() .unwrap_or_else(|| Arc::new(DefaultSchemaAdapterFactory)); - + if let Some(dynamic_filter) = &self.dynamic_filters { + let (final_expr, name) = + dynamic_filter.final_predicate(self.predicate.clone()); + if final_expr.is_some() { + self.metrics.register(Arc::new(Metric::new( + datafusion_physical_plan::metrics::MetricValue::DynamicFilter(name), + None, + ))); + } + } let opener = ParquetOpener { partition_index, projection: Arc::from(projection), @@ -819,7 +832,8 @@ impl ExecutionPlan for ParquetExec { }; let stream = - FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?; + FileStream::new(&self.base_config, partition_index, opener, &self.metrics)? + .with_dynamic_filter(self.dynamic_filters.clone()); Ok(Box::pin(stream)) } @@ -862,8 +876,33 @@ impl ExecutionPlan for ParquetExec { cache: self.cache.clone(), table_parquet_options: self.table_parquet_options.clone(), schema_adapter_factory: self.schema_adapter_factory.clone(), + dynamic_filters: self.dynamic_filters.clone(), })) } + + fn support_dynamic_filter(&self) -> bool { + true + } + + fn with_dynamic_filter( + &self, + dynamic_filters: Option>, + ) -> Result>> { + Ok(Some(Arc::new(ParquetExec { + base_config: self.base_config.clone(), + projected_statistics: self.projected_statistics.clone(), + metrics: self.metrics.clone(), + predicate: self.predicate.clone(), + pruning_predicate: self.pruning_predicate.clone(), + page_pruning_predicate: self.page_pruning_predicate.clone(), + metadata_size_hint: self.metadata_size_hint, + parquet_file_reader_factory: self.parquet_file_reader_factory.clone(), + cache: self.cache.clone(), + table_parquet_options: self.table_parquet_options.clone(), + schema_adapter_factory: self.schema_adapter_factory.clone(), + dynamic_filters, + }))) + } } fn should_enable_page_index( diff --git a/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs new file mode 100644 index 000000000000..3bf02d2d1bbf --- /dev/null +++ b/datafusion/core/src/physical_optimizer/join_filter_pushdown.rs @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Pushdown the dynamic join filters down to scan execution if there is any + +use std::sync::Arc; + +use crate::datasource::physical_plan::ParquetExec; +use crate::physical_plan::ExecutionPlan; +use crate::{config::ConfigOptions, error::Result, physical_plan::joins::HashJoinExec}; +use datafusion_common::tree_node::{Transformed, TransformedResult}; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use datafusion_physical_plan::joins::DynamicFilterInfo; + +/// this rule used for pushing the build side statistic down to probe phase +#[derive(Default, Debug)] +pub struct JoinFilterPushdown {} + +impl JoinFilterPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl PhysicalOptimizerRule for JoinFilterPushdown { + fn optimize( + &self, + plan: Arc, + config: &ConfigOptions, + ) -> Result> { + if !config.optimizer.dynamic_join_pushdown { + return Ok(plan); + } + optimize_impl(plan, &mut None).data() + } + + fn name(&self) -> &str { + "JoinFilterPushdown" + } + + fn schema_check(&self) -> bool { + true + } +} + +fn optimize_impl( + plan: Arc, + join_filters: &mut Option>, +) -> Result>> { + if let Some(hashjoin_exec) = plan.as_any().downcast_ref::() { + join_filters.clone_from(&hashjoin_exec.dynamic_filters_pushdown); + let new_right = optimize_impl(hashjoin_exec.right.clone(), join_filters)?; + if new_right.transformed { + let new_hash_join = HashJoinExec::try_new( + hashjoin_exec.left().clone(), + new_right.data, + hashjoin_exec.on.clone(), + hashjoin_exec.filter().cloned(), + hashjoin_exec.join_type(), + hashjoin_exec.projection.clone(), + *hashjoin_exec.partition_mode(), + hashjoin_exec.null_equals_null(), + )? + .with_dynamic_filter(hashjoin_exec.dynamic_filters_pushdown.clone())? + .map_or(plan, |f| f); + return Ok(Transformed::yes(new_hash_join)); + } + Ok(Transformed::no(plan)) + } else if let Some(parquet_exec) = plan.as_any().downcast_ref::() { + if let Some(dynamic_filters) = join_filters { + let final_exec = parquet_exec + .clone() + .with_dynamic_filter(Some(dynamic_filters.clone()))?; + if let Some(plan) = final_exec { + return Ok(Transformed::yes(plan)); + } else { + return Ok(Transformed::no(plan)); + } + } + Ok(Transformed::no(plan)) + } else { + let children = plan.children(); + let mut new_children = Vec::with_capacity(children.len()); + let mut transformed = false; + + for child in children { + let new_child = optimize_impl(child.clone(), join_filters)?; + if new_child.transformed { + transformed = true; + } + new_children.push(new_child.data); + } + + if transformed { + let new_plan = plan.with_new_children(new_children)?; + Ok(Transformed::yes(new_plan)) + } else { + Ok(Transformed::no(plan)) + } + } +} diff --git a/datafusion/core/src/physical_optimizer/join_selection.rs b/datafusion/core/src/physical_optimizer/join_selection.rs index 0312e362afb1..7f9140e30afa 100644 --- a/datafusion/core/src/physical_optimizer/join_selection.rs +++ b/datafusion/core/src/physical_optimizer/join_selection.rs @@ -387,7 +387,7 @@ fn try_collect_left( { Ok(Some(swap_hash_join(hash_join, PartitionMode::CollectLeft)?)) } else { - Ok(Some(Arc::new(HashJoinExec::try_new( + Ok(HashJoinExec::try_new( Arc::clone(left), Arc::clone(right), hash_join.on().to_vec(), @@ -396,7 +396,8 @@ fn try_collect_left( hash_join.projection.clone(), PartitionMode::CollectLeft, hash_join.null_equals_null(), - )?))) + )? + .with_dynamic_filter(hash_join.dynamic_filters_pushdown.clone())?) } } (true, false) => Ok(Some(Arc::new(HashJoinExec::try_new( diff --git a/datafusion/core/src/physical_optimizer/mod.rs b/datafusion/core/src/physical_optimizer/mod.rs index efdd3148d03f..f9d65141d947 100644 --- a/datafusion/core/src/physical_optimizer/mod.rs +++ b/datafusion/core/src/physical_optimizer/mod.rs @@ -24,6 +24,8 @@ pub mod coalesce_batches; pub mod enforce_distribution; pub mod enforce_sorting; +#[cfg(feature = "parquet")] +pub mod join_filter_pushdown; pub mod join_selection; pub mod optimizer; pub mod projection_pushdown; diff --git a/datafusion/core/src/physical_optimizer/optimizer.rs b/datafusion/core/src/physical_optimizer/optimizer.rs index 7a6f991121ef..412c20458e09 100644 --- a/datafusion/core/src/physical_optimizer/optimizer.rs +++ b/datafusion/core/src/physical_optimizer/optimizer.rs @@ -17,9 +17,8 @@ //! Physical optimizer traits -use datafusion_physical_optimizer::PhysicalOptimizerRule; -use std::sync::Arc; - +#[cfg(feature = "parquet")] +use super::join_filter_pushdown::JoinFilterPushdown; use super::projection_pushdown::ProjectionPushdown; use super::update_aggr_exprs::OptimizeAggregateOrder; use crate::physical_optimizer::aggregate_statistics::AggregateStatistics; @@ -33,6 +32,8 @@ use crate::physical_optimizer::limited_distinct_aggregation::LimitedDistinctAggr use crate::physical_optimizer::output_requirements::OutputRequirements; use crate::physical_optimizer::sanity_checker::SanityCheckPlan; use crate::physical_optimizer::topk_aggregation::TopKAggregation; +use datafusion_physical_optimizer::PhysicalOptimizerRule; +use std::sync::Arc; /// A rule-based physical optimizer. #[derive(Clone, Debug)] @@ -112,6 +113,8 @@ impl PhysicalOptimizer { // given query plan; i.e. it only acts as a final // gatekeeping rule. Arc::new(SanityCheckPlan::new()), + #[cfg(feature = "parquet")] + Arc::new(JoinFilterPushdown::new()), ]; Self::with_rules(rules) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 7d475ad2e2a1..9bc87f24ac5c 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -17,10 +17,6 @@ //! Planner for [`LogicalPlan`] to [`ExecutionPlan`] -use std::borrow::Cow; -use std::collections::HashMap; -use std::sync::Arc; - use crate::datasource::file_format::file_type_to_format; use crate::datasource::listing::ListingTableUrl; use crate::datasource::physical_plan::FileSinkConfig; @@ -60,6 +56,10 @@ use crate::physical_plan::{ displayable, windows, ExecutionPlan, ExecutionPlanProperties, InputOrderMode, Partitioning, PhysicalExpr, WindowExpr, }; +use datafusion_physical_plan::joins::DynamicFilterInfo; +use std::borrow::Cow; +use std::collections::HashMap; +use std::sync::Arc; use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; @@ -81,7 +81,7 @@ use datafusion_expr::{ SkipType, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp, }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; -use datafusion_physical_expr::expressions::Literal; +use datafusion_physical_expr::expressions::{Column, Literal}; use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::placeholder_row::PlaceholderRowExec; use datafusion_physical_plan::unnest::ListUnnest; @@ -342,6 +342,7 @@ impl DefaultPhysicalPlanner { ); } let plan = outputs.pop().unwrap(); + Ok(plan) } @@ -369,6 +370,7 @@ impl DefaultPhysicalPlanner { ChildrenContainer::None, ) .await?; + let mut current_index = leaf_starter_index; // parent_index is None only for root while let Some(parent_index) = node.parent_index { @@ -861,6 +863,7 @@ impl DefaultPhysicalPlanner { join_type, null_equals_null, schema: join_schema, + dynamic_pushdown_columns, .. }) => { let null_equals_null = *null_equals_null; @@ -1063,8 +1066,8 @@ impl DefaultPhysicalPlanner { let prefer_hash_join = session_state.config_options().optimizer.prefer_hash_join; - - let join: Arc = if join_on.is_empty() { + let right_schema = physical_right.schema(); + let mut join: Arc = if join_on.is_empty() { if join_filter.is_none() && matches!(join_type, JoinType::Inner) { // cross join if there is no join conditions and no join filter set Arc::new(CrossJoinExec::new(physical_left, physical_right)) @@ -1128,6 +1131,50 @@ impl DefaultPhysicalPlanner { )?) }; + // build dynamic filter + if join.support_dynamic_filter() + && dynamic_pushdown_columns + .as_ref() + .is_some_and(|columns| !columns.is_empty()) + { + let physical_dynamic_filter_info: Option> = + if let Some(dynamic_columns) = dynamic_pushdown_columns { + let columns_and_types_and_names: Vec<(Arc, String)> = + dynamic_columns + .iter() + .map(|dynamic_column| { + let column = dynamic_column.column(); + let index = + right_schema.index_of(column.name())?; + let physical_column = + Arc::new(Column::new(&column.name, index)); + let build_side_name = + dynamic_column.build_name().to_owned(); + Ok((physical_column, build_side_name)) + }) + .collect::>()?; + + let (physical_columns, build_side_names) = + columns_and_types_and_names.into_iter().fold( + (Vec::new(), Vec::new()), + |(mut cols, mut names), (col, name)| { + cols.push(col); + names.push(name); + (cols, names) + }, + ); + + Some(Arc::new(DynamicFilterInfo::try_new( + physical_columns, + build_side_names, + )?)) + } else { + None + }; + join = join + .with_dynamic_filter(physical_dynamic_filter_info)? + .map_or(join, |plan| plan); + } // If plan was mutated previously then need to create the ExecutionPlan // for the new Projection that was applied on top. if let Some((input, expr)) = new_project { diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index 790fc508d4ca..77798fade63b 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -974,6 +974,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null, + dynamic_pushdown_columns: None, }))) } @@ -1038,6 +1039,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::Using, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + dynamic_pushdown_columns: None, }))) } } @@ -1055,6 +1057,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, null_equals_null: false, schema: DFSchemaRef::new(join_schema), + dynamic_pushdown_columns: None, }))) } @@ -1266,6 +1269,7 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + dynamic_pushdown_columns: None, }))) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index ea8fca3ec9d6..450a543d3408 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -36,7 +36,7 @@ use crate::logical_plan::{DmlStatement, Statement}; use crate::utils::{ enumerate_grouping_sets, exprlist_len, exprlist_to_fields, find_base_plan, find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist, - split_conjunction, + split_conjunction, DynamicFilterColumn, }; use crate::{ build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr, @@ -643,6 +643,7 @@ impl LogicalPlan { on, schema: _, null_equals_null, + dynamic_pushdown_columns, }) => { let schema = build_join_schema(left.schema(), right.schema(), &join_type)?; @@ -664,6 +665,7 @@ impl LogicalPlan { filter, schema: DFSchemaRef::new(schema), null_equals_null, + dynamic_pushdown_columns, })) } LogicalPlan::Subquery(_) => Ok(self), @@ -919,6 +921,7 @@ impl LogicalPlan { filter: filter_expr, schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, + dynamic_pushdown_columns: None, })) } LogicalPlan::Subquery(Subquery { @@ -3276,6 +3279,8 @@ pub struct Join { pub schema: DFSchemaRef, /// If null_equals_null is true, null == null else null != null pub null_equals_null: bool, + /// store the column which we could try to push down to scan dynamically + pub dynamic_pushdown_columns: Option>, } impl Join { @@ -3309,8 +3314,17 @@ impl Join { join_constraint: original_join.join_constraint, schema: Arc::new(join_schema), null_equals_null: original_join.null_equals_null, + dynamic_pushdown_columns: None, }) } + + pub fn with_dynamic_pushdown_columns( + mut self, + pushdown_columns: Vec, + ) -> Self { + self.dynamic_pushdown_columns = Some(pushdown_columns); + self + } } // Manual implementation needed because of `schema` field. Comparison excludes this field. diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index ff2c1ec1d58f..20fc5d1ae45e 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -143,6 +143,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => map_until_stop_and_collect!( rewrite_arc(left, &mut f), right, @@ -158,6 +159,7 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) }), LogicalPlan::Limit(Limit { skip, fetch, input }) => rewrite_arc(input, f)? @@ -635,6 +637,7 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => map_until_stop_and_collect!( on.into_iter().map_until_stop_and_collect( |on| map_until_stop_and_collect!(f(on.0), on.1, f(on.1)) @@ -654,6 +657,7 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) }), LogicalPlan::Sort(Sort { expr, input, fetch }) => { diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index c22ee244fe28..45b403d8cb7f 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -1395,6 +1395,25 @@ pub fn format_state_name(name: &str, state_name: &str) -> String { format!("{name}[{state_name}]") } +/// contain the left column name(build side) and actual column side(probe side) +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct DynamicFilterColumn { + pub build_name: String, + pub column: Column, +} + +impl DynamicFilterColumn { + pub fn new(build_name: String, column: Column) -> Self { + Self { build_name, column } + } + pub fn column(&self) -> &Column { + &self.column + } + pub fn build_name(&self) -> &str { + &self.build_name + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index b4256508e351..4b8b5cbec04c 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -888,7 +888,7 @@ macro_rules! min_max { } /// An accumulator to compute the maximum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MaxAccumulator { max: ScalarValue, } @@ -1195,7 +1195,7 @@ fn get_min_doc() -> &'static Documentation { } /// An accumulator to compute the minimum value -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MinAccumulator { min: ScalarValue, } diff --git a/datafusion/optimizer/Cargo.toml b/datafusion/optimizer/Cargo.toml index 79a5bb24e918..422751a9269f 100644 --- a/datafusion/optimizer/Cargo.toml +++ b/datafusion/optimizer/Cargo.toml @@ -41,6 +41,7 @@ async-trait = { workspace = true } chrono = { workspace = true } datafusion-common = { workspace = true, default-features = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate = { workspace = true } datafusion-physical-expr = { workspace = true } hashbrown = { workspace = true } indexmap = { workspace = true } diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index 65ebac2106ad..bd784860eb57 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -329,6 +329,7 @@ fn find_inner_join( filter: None, schema: join_schema, null_equals_null: false, + dynamic_pushdown_columns: None, })); } } @@ -351,6 +352,7 @@ fn find_inner_join( join_type: JoinType::Inner, join_constraint: JoinConstraint::On, null_equals_null: false, + dynamic_pushdown_columns: None, })) } diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 1ecb32ca2a43..a23f4239acbe 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -119,6 +119,7 @@ impl OptimizerRule for EliminateOuterJoin { filter: join.filter.clone(), schema: Arc::clone(&join.schema), null_equals_null: join.null_equals_null, + dynamic_pushdown_columns: None, })); Filter::try_new(filter.predicate, new_join) .map(|f| Transformed::yes(LogicalPlan::Filter(f))) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 48191ec20631..efdeb6d197e8 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -76,6 +76,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }) => { let left_schema = left.schema(); let right_schema = right.schema(); @@ -93,6 +94,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }))) } else { Ok(Transformed::no(LogicalPlan::Join(Join { @@ -104,6 +106,7 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + dynamic_pushdown_columns, }))) } } diff --git a/datafusion/optimizer/src/join_filter_pushdown.rs b/datafusion/optimizer/src/join_filter_pushdown.rs new file mode 100644 index 000000000000..accd8aa7699e --- /dev/null +++ b/datafusion/optimizer/src/join_filter_pushdown.rs @@ -0,0 +1,316 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`JoinFilterPushdown`] pushdown join filter to scan dynamically + +use arrow::datatypes::DataType; +use datafusion_common::{tree_node::Transformed, DataFusionError, ExprSchema}; +use datafusion_expr::{utils::DynamicFilterColumn, Expr, JoinType, LogicalPlan}; + +use crate::{optimizer::ApplyOrder, OptimizerConfig, OptimizerRule}; + +#[derive(Default, Debug)] +pub struct JoinFilterPushdown {} + +impl JoinFilterPushdown { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for JoinFilterPushdown { + fn supports_rewrite(&self) -> bool { + true + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result, DataFusionError> { + if !config.options().optimizer.dynamic_join_pushdown + //|| !config.options().execution.parquet.pushdown_filters + { + return Ok(Transformed::no(plan)); + } + + match plan { + LogicalPlan::Join(mut join) => { + if unsupported_join_type(&(join.join_type)) { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + + let mut columns = Vec::new(); + let mut build_side_names = Vec::new(); + // Iterate the on clause and generate the filter info + for (left, right) in join.on.iter() { + // Only support left to be a column + if let (Expr::Column(l), Expr::Column(r)) = (left, right) { + let data_type = join.schema.data_type(l)?; + // Todo: currently only support numeric data type + if data_type.is_numeric() || matches!(data_type, DataType::Utf8) { + columns.push(r.clone()); + build_side_names.push(l.name().to_owned()); + } + } + } + + let mut probe = join.right.as_ref(); + // On the probe sides, we want to make sure that we can push the filter to the probe side + loop { + if matches!(probe, LogicalPlan::TableScan(_)) { + break; + } + match probe { + LogicalPlan::Limit(_) + | LogicalPlan::Filter(_) + | LogicalPlan::Sort(_) + | LogicalPlan::Distinct(_) => { + probe = probe.inputs()[0]; + } + LogicalPlan::Projection(project) => { + for column in &columns { + if !project.schema.has_column(column) { + return Ok(Transformed::no(LogicalPlan::Join(join))); + } + } + probe = probe.inputs()[0]; + } + _ => return Ok(Transformed::no(LogicalPlan::Join(join))), + } + } + let dynamic_columns = columns + .into_iter() + .zip(build_side_names) + .map(|(column, name)| DynamicFilterColumn::new(name, column)) + .collect::>(); + // Assign the value + join = join.with_dynamic_pushdown_columns(dynamic_columns); + Ok(Transformed::yes(LogicalPlan::Join(join))) + } + _ => Ok(Transformed::no(plan)), + } + } + fn name(&self) -> &str { + "join_filter_pushdown" + } +} + +fn unsupported_join_type(join_type: &JoinType) -> bool { + matches!( + join_type, + JoinType::Left + | JoinType::RightSemi + | JoinType::RightAnti + | JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + ) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::Column; + use datafusion_expr::{ + col, logical_plan::table_scan, JoinType, LogicalPlan, LogicalPlanBuilder, + }; + use std::sync::Arc; + + fn schema() -> Schema { + Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ]) + } + + fn get_optimized_plan(plan: LogicalPlan) -> Result { + Ok(generate_optimized_plan_with_rules( + vec![Arc::new(JoinFilterPushdown::new())], + plan, + )) + } + + #[test] + fn test_inner_join_with_pushdown() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .build()?; + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name, + "a" + ); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_left_join_no_pushdown() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Left, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_none()); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_projection() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .project(vec![col("t1.a"), col("t2.b")])? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Projection(projection) = optimized_plan { + if let LogicalPlan::Join(join) = projection.input.as_ref() { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name, + "a" + ); + } else { + panic!("Expected Join operation under Projection"); + } + } else { + panic!("Expected Projection operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_multiple_keys() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + ( + vec![Column::from_name("a"), Column::from_name("b")], + vec![Column::from_name("a"), Column::from_name("b")], + ), + None, + )? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Join(join) = optimized_plan { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 2); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name(), + "a" + ); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[1] + .column + .name(), + "b" + ); + } else { + panic!("Expected Join operation"); + } + Ok(()) + } + + #[test] + fn test_join_with_filter() -> Result<(), DataFusionError> { + let t1 = table_scan(Some("t1"), &schema(), None)?.build()?; + let t2 = table_scan(Some("t2"), &schema(), None)?.build()?; + + let plan = LogicalPlanBuilder::from(t1) + .join( + t2, + JoinType::Inner, + (vec![Column::from_name("a")], vec![Column::from_name("a")]), + None, + )? + .filter(col("t1.b").gt(col("t1.c")))? + .build()?; + + let optimized_plan = get_optimized_plan(plan)?; + if let LogicalPlan::Filter(filter) = optimized_plan { + if let LogicalPlan::Join(join) = filter.input.as_ref() { + assert!(join.dynamic_pushdown_columns.is_some()); + assert_eq!(join.dynamic_pushdown_columns.as_ref().unwrap().len(), 1); + assert_eq!( + join.dynamic_pushdown_columns.as_ref().unwrap()[0] + .column + .name(), + "a" + ); + } else { + panic!("Expected Join operation under Filter"); + } + } else { + panic!("Expected Filter operation"); + } + Ok(()) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index f31083831125..cb5b70cae6a4 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -45,6 +45,7 @@ pub mod eliminate_one_union; pub mod eliminate_outer_join; pub mod extract_equijoin_predicate; pub mod filter_null_join_keys; +pub mod join_filter_pushdown; pub mod optimize_projections; pub mod optimizer; pub mod propagate_empty_relation; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 975150cd6122..b37c9a4d7bc3 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -44,6 +44,7 @@ use crate::eliminate_one_union::EliminateOneUnion; use crate::eliminate_outer_join::EliminateOuterJoin; use crate::extract_equijoin_predicate::ExtractEquijoinPredicate; use crate::filter_null_join_keys::FilterNullJoinKeys; +use crate::join_filter_pushdown::JoinFilterPushdown; use crate::optimize_projections::OptimizeProjections; use crate::plan_signature::LogicalPlanSignature; use crate::propagate_empty_relation::PropagateEmptyRelation; @@ -270,6 +271,7 @@ impl Optimizer { Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateGroupByConstant::new()), Arc::new(OptimizeProjections::new()), + Arc::new(JoinFilterPushdown::new()), ]; Self::with_rules(rules) diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index 94d07a0791b3..6e37e83d68f1 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -181,7 +181,7 @@ pub fn assert_optimized_plan_eq( Ok(()) } -fn generate_optimized_plan_with_rules( +pub fn generate_optimized_plan_with_rules( rules: Vec>, plan: LogicalPlan, ) -> LogicalPlan { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index ae2bfe5b0bd4..83d15b3675fe 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -23,6 +23,7 @@ use std::{any::Any, sync::Arc}; use crate::intervals::cp_solver::{propagate_arithmetic, propagate_comparison}; use crate::PhysicalExpr; +use crate::expressions::binary::kernels::concat_elements_utf8view; use arrow::array::*; use arrow::compute::kernels::boolean::{and_kleene, not, or_kleene}; use arrow::compute::kernels::cmp::*; @@ -38,8 +39,6 @@ use datafusion_expr::sort_properties::ExprProperties; use datafusion_expr::type_coercion::binary::get_result_type; use datafusion_expr::{ColumnarValue, Operator}; use datafusion_physical_expr_common::datum::{apply, apply_cmp, apply_cmp_for_nested}; - -use crate::expressions::binary::kernels::concat_elements_utf8view; use kernels::{ bitwise_and_dyn, bitwise_and_dyn_scalar, bitwise_or_dyn, bitwise_or_dyn_scalar, bitwise_shift_left_dyn, bitwise_shift_left_dyn_scalar, bitwise_shift_right_dyn, diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index a9f9b22fafda..832edee339c9 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -51,6 +51,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate = { workspace = true } datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 7220e7594ea6..100bde188339 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -43,6 +43,7 @@ use datafusion_physical_expr_common::sort_expr::LexRequirement; use crate::coalesce_partitions::CoalescePartitionsExec; use crate::display::DisplayableExecutionPlan; pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, VerboseDisplay}; +use crate::joins::DynamicFilterInfo; pub use crate::metrics::Metric; use crate::metrics::MetricsSet; pub use crate::ordering::InputOrderMode; @@ -422,6 +423,19 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync { fn cardinality_effect(&self) -> CardinalityEffect { CardinalityEffect::Unknown } + + /// whether it is a physical scan or not + fn support_dynamic_filter(&self) -> bool { + false + } + + /// return a new execution plan with dynamic filter + fn with_dynamic_filter( + &self, + _dynamic_filters: Option>, + ) -> Result>> { + Ok(None) + } } /// Extension trait provides an easy API to fetch various properties of diff --git a/datafusion/physical-plan/src/joins/dynamic_filters.rs b/datafusion/physical-plan/src/joins/dynamic_filters.rs new file mode 100644 index 000000000000..c9f7d54ff31c --- /dev/null +++ b/datafusion/physical-plan/src/joins/dynamic_filters.rs @@ -0,0 +1,520 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +use super::utils::JoinHashMap; +use arrow::array::{AsArray, BooleanBuilder}; +use arrow::array::{ + Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, + UInt16Array, UInt32Array, UInt64Array, UInt8Array, +}; +use arrow::compute::filter_record_batch; +use arrow::compute::kernels::aggregate::{max, max_string, min, min_string}; + +use arrow::datatypes::DataType; +use arrow::record_batch::RecordBatch; +use arrow_array::StringArray; +use arrow_array::{Array, ArrayRef}; +use datafusion_common::{arrow_err, DataFusionError, ScalarValue}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, InListExpr, IsNullExpr, Literal, +}; +use datafusion_physical_expr::PhysicalExpr; +use hashbrown::HashSet; +use parking_lot::Mutex; +use std::fmt; +use std::sync::Arc; + +const UNIQUE_VALUES_THRESHOLD: usize = 0; + +macro_rules! process_unique_values { + ($array:expr, $array_type:ty, $scalar_type:ident, $unique_set:expr, $should_track:expr) => {{ + if let Some(array) = $array.as_any().downcast_ref::<$array_type>() { + if $should_track { + let mut has_null = false; + for value in array.iter() { + match value { + Some(value) => { + $unique_set.insert(ScalarValue::$scalar_type(Some(value))); + if $unique_set.len() > UNIQUE_VALUES_THRESHOLD { + $unique_set.clear(); + return Ok(false); + } + } + None => { + has_null = true; + } + } + } + if has_null { + $unique_set.insert(ScalarValue::$scalar_type(None)); + } + } + } + Ok(true) + }}; +} +macro_rules! process_min_max { + ($array:expr, $array_type:ty, $scalar_type:ident) => {{ + if let Some(array) = $array.as_any().downcast_ref::<$array_type>() { + let min = min(array) + .ok_or_else(|| DataFusionError::Internal("Empty array".to_string()))?; + let max = max(array) + .ok_or_else(|| DataFusionError::Internal("Empty array".to_string()))?; + Ok(( + ScalarValue::$scalar_type(Some(min)), + ScalarValue::$scalar_type(Some(max)), + )) + } else { + Err(DataFusionError::Internal("Invalid array type".to_string())) + } + }}; +} + +struct DynamicFilterInfoInner { + unique_values: Vec>, + value_ranges: Vec>, + batches: Vec>>, + final_expr: Option>, + batch_count: usize, + processed_partitions: HashSet, + should_track_unique: Vec, +} + +pub struct DynamicFilterInfo { + columns: Vec>, + build_side_names: Vec, + inner: Mutex, +} + +impl DynamicFilterInfo { + pub fn try_new( + columns: Vec>, + build_side_names: Vec, + ) -> Result { + let col_count = columns.len(); + Ok(Self { + columns, + build_side_names, + inner: Mutex::new(DynamicFilterInfoInner { + unique_values: vec![HashSet::new(); col_count], + value_ranges: vec![None; col_count], + batches: vec![Vec::new(); col_count], + final_expr: None, + batch_count: 0, + processed_partitions: HashSet::new(), + should_track_unique: vec![true; col_count], + }), + }) + } + + pub fn merge_batch_and_check_finalized( + &self, + records: &RecordBatch, + partition: usize, + ) -> Result { + let mut inner = self.inner.lock(); + if inner.final_expr.is_some() + || (inner.processed_partitions.contains(&partition) + && records.num_rows() == 0) + { + return Ok(true); + } + if !inner.processed_partitions.insert(partition) { + return Ok(false); + } + + inner.batch_count = inner.batch_count.saturating_sub(1); + let finalize = inner.batch_count == 0; + + let schema = records.schema(); + let columns = records.columns(); + + for (i, _) in self.columns.iter().enumerate() { + let index = schema.index_of(&self.build_side_names[i])?; + let column_data = &columns[index]; + + let should_track = inner.should_track_unique[i]; + + if should_track { + let still_tracking = update_unique_values( + &mut inner.unique_values[i], + column_data, + should_track, + )?; + + if !still_tracking { + inner.should_track_unique[i] = false; + } + } + + update_range_stats(&mut inner.value_ranges[i], column_data)?; + inner.batches[i].push(Arc::clone(column_data)); + } + + if finalize { + drop(inner); + self.finalize_filter()?; + return Ok(true); + } + + Ok(false) + } + + fn finalize_filter(&self) -> Result<(), DataFusionError> { + let mut inner = self.inner.lock(); + let filter_expr = + self.columns.iter().enumerate().try_fold::<_, _, Result< + Option>, + DataFusionError, + >>(None, |acc, (i, column)| { + let unique_values = &inner.unique_values[i]; + let value_range = &inner.value_ranges[i]; + let should_track_unique = inner.should_track_unique[i]; + + let use_unique_list = should_track_unique + && !unique_values.is_empty() + && !is_continuous_type(inner.batches[i][0].data_type()); + + let column_expr = if use_unique_list { + let values: Vec> = unique_values + .iter() + .cloned() + .map(|value| { + Arc::new(Literal::new(value)) as Arc + }) + .collect(); + Arc::new(InListExpr::new( + Arc::::clone(column), + values, + false, + None, + )) as Arc + } else { + let (min_value, max_value) = + value_range.clone().ok_or_else(|| { + DataFusionError::Internal("Missing range values".to_string()) + })?; + + Arc::new(BinaryExpr::new( + Arc::new(BinaryExpr::new( + Arc::new(Literal::new(min_value)), + Operator::LtEq, + Arc::::clone(column), + )), + Operator::And, + Arc::new(BinaryExpr::new( + Arc::::clone(column), + Operator::LtEq, + Arc::new(Literal::new(max_value)), + )), + )) as Arc + }; + + match acc { + Some(expr) => Ok(Some(Arc::new(BinaryExpr::new( + expr, + Operator::And, + column_expr, + )))), + None => Ok(Some(column_expr)), + } + })?; + let final_expr = match filter_expr { + Some(expr) => Arc::new(BinaryExpr::new( + expr, + Operator::Or, + Arc::new(IsNullExpr::new(Arc::::clone(&self.columns[0]))), + )) as Arc, + None => Arc::new(BinaryExpr::new( + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + Operator::Eq, + Arc::new(Literal::new(ScalarValue::Boolean(Some(true)))), + )) as Arc, + }; + + inner.final_expr = Some(final_expr); + Ok(()) + } + + pub fn add_count(&self) -> Result<(), DataFusionError> { + let mut inner = self.inner.lock(); + inner.batch_count = inner.batch_count.saturating_add(1); + Ok(()) + } + + pub fn filter_batch( + &self, + records: &RecordBatch, + ) -> Result { + let filter_expr = match self.inner.lock().final_expr.as_ref() { + Some(expr) => Arc::clone(expr), + None => return Err(DataFusionError::Internal( + "Filter expression should have been created before calling filter_batch" + .to_string(), + )), + }; + let boolean_array = filter_expr + .evaluate(records)? + .into_array(records.num_rows())?; + + Ok(filter_record_batch( + records, + boolean_array.as_ref().as_boolean(), + )?) + } + + pub fn has_final_expr(&self) -> bool { + self.inner.lock().final_expr.is_some() + } + + pub fn final_predicate( + &self, + predicate: Option>, + ) -> (Option>, String) { + let inner = self.inner.lock(); + + let result = match (inner.final_expr.clone(), predicate) { + (Some(self_expr), Some(input_expr)) => { + Some( + Arc::new(BinaryExpr::new(self_expr, Operator::And, input_expr)) + as Arc, + ) + } + (Some(self_expr), None) => Some(self_expr), + (None, Some(input_expr)) => Some(input_expr), + (None, None) => None, + }; + + let debug_info = inner + .final_expr + .as_ref() + .map(|expr| format!("{}", expr)) + .unwrap_or_default(); + + (result, debug_info) + } +} + +fn update_unique_values( + unique_set: &mut HashSet, + array: &dyn Array, + should_track: bool, +) -> Result { + if !should_track { + return Ok(false); + } + + match array.data_type() { + DataType::Int8 => { + process_unique_values!(array, Int8Array, Int8, unique_set, should_track) + } + DataType::Int16 => { + process_unique_values!(array, Int16Array, Int16, unique_set, should_track) + } + DataType::Int32 => { + process_unique_values!(array, Int32Array, Int32, unique_set, should_track) + } + DataType::Int64 => { + process_unique_values!(array, Int64Array, Int64, unique_set, should_track) + } + DataType::UInt8 => { + process_unique_values!(array, UInt8Array, UInt8, unique_set, should_track) + } + DataType::UInt16 => { + process_unique_values!(array, UInt16Array, UInt16, unique_set, should_track) + } + DataType::UInt32 => { + process_unique_values!(array, UInt32Array, UInt32, unique_set, should_track) + } + DataType::UInt64 => { + process_unique_values!(array, UInt64Array, UInt64, unique_set, should_track) + } + DataType::Float32 => { + process_unique_values!(array, Float32Array, Float32, unique_set, should_track) + } + DataType::Float64 => { + process_unique_values!(array, Float64Array, Float64, unique_set, should_track) + } + DataType::Utf8 => { + if let Some(array) = array.as_any().downcast_ref::() { + if should_track { + for value in array.iter().flatten() { + unique_set.insert(ScalarValue::Utf8(Some(value.to_string()))); + if unique_set.len() > UNIQUE_VALUES_THRESHOLD { + unique_set.clear(); + return Ok(false); + } + } + } + } + Ok(true) + } + _ => Err(DataFusionError::NotImplemented(format!( + "Unique value tracking not implemented for type {}", + array.data_type() + ))), + } +} + +fn compute_min_max( + array: &dyn Array, +) -> Result<(ScalarValue, ScalarValue), DataFusionError> { + match array.data_type() { + DataType::Int8 => process_min_max!(array, Int8Array, Int8), + DataType::Int16 => process_min_max!(array, Int16Array, Int16), + DataType::Int32 => process_min_max!(array, Int32Array, Int32), + DataType::Int64 => process_min_max!(array, Int64Array, Int64), + DataType::UInt8 => process_min_max!(array, UInt8Array, UInt8), + DataType::UInt16 => process_min_max!(array, UInt16Array, UInt16), + DataType::UInt32 => process_min_max!(array, UInt32Array, UInt32), + DataType::UInt64 => process_min_max!(array, UInt64Array, UInt64), + DataType::Float32 => process_min_max!(array, Float32Array, Float32), + DataType::Float64 => process_min_max!(array, Float64Array, Float64), + DataType::Utf8 => { + if let Some(array) = array.as_any().downcast_ref::() { + let min = min_string(array).ok_or_else(|| { + DataFusionError::Internal("Empty array".to_string()) + })?; + let max = max_string(array).ok_or_else(|| { + DataFusionError::Internal("Empty array".to_string()) + })?; + Ok(( + ScalarValue::Utf8(Some(min.to_string())), + ScalarValue::Utf8(Some(max.to_string())), + )) + } else { + Err(DataFusionError::Internal("Invalid array type".to_string())) + } + } + _ => Err(DataFusionError::NotImplemented(format!( + "Min/Max not implemented for type {}", + array.data_type() + ))), + } +} + +fn update_range_stats( + range: &mut Option<(ScalarValue, ScalarValue)>, + array: &dyn Array, +) -> Result<(), DataFusionError> { + if array.is_empty() { + return Ok(()); + } + let (min, max) = compute_min_max(array)?; + + *range = match range.take() { + Some((curr_min, curr_max)) => { + let min_value = match curr_min.partial_cmp(&min) { + Some(std::cmp::Ordering::Less) | Some(std::cmp::Ordering::Equal) => { + curr_min + } + _ => min, + }; + let max_value = match curr_max.partial_cmp(&max) { + Some(std::cmp::Ordering::Greater) | Some(std::cmp::Ordering::Equal) => { + curr_max + } + _ => max, + }; + Some((min_value, max_value)) + } + None => Some((min, max)), + }; + + Ok(()) +} + +fn is_continuous_type(data_type: &DataType) -> bool { + matches!(data_type, DataType::Float32 | DataType::Float64) +} + +pub struct PartitionedDynamicFilterInfo { + partition: usize, + dynamic_filter_info: Arc, +} + +impl PartitionedDynamicFilterInfo { + pub fn new(partition: usize, dynamic_filter_info: Arc) -> Self { + Self { + partition, + dynamic_filter_info, + } + } + + pub fn merge_batch_and_check_finalized( + &self, + records: &RecordBatch, + ) -> Result { + self.dynamic_filter_info + .merge_batch_and_check_finalized(records, self.partition) + } + + pub fn _filter_probe_batch( + &self, + batch: &RecordBatch, + hashes: &[u64], + hash_map: &JoinHashMap, + ) -> Result<(RecordBatch, Vec), DataFusionError> { + let left_hash_set = hash_map.extract_unique_keys(); + + let mut mask_builder = BooleanBuilder::new(); + for hash in hashes.iter() { + mask_builder.append_value(left_hash_set.contains(hash)); + } + let mask = mask_builder.finish(); + + let filtered_columns = batch + .columns() + .iter() + .map(|col| match arrow::compute::filter(col, &mask) { + Ok(array) => Ok(array), + Err(e) => arrow_err!(e), + }) + .collect::, DataFusionError>>()?; + + let filtered_batch = RecordBatch::try_new(batch.schema(), filtered_columns)?; + + let filtered_hashes = hashes + .iter() + .zip(mask.iter()) + .filter_map(|(hash, keep)| { + keep.and_then(|k| if k { Some(*hash) } else { None }) + }) + .collect(); + + Ok((filtered_batch, filtered_hashes)) + } +} + +impl fmt::Debug for PartitionedDynamicFilterInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PartitionedDynamicFilterInfo") + .field("partition", &self.partition) + .field("dynamic_filter_info", &"") + .finish() + } +} + +impl fmt::Debug for DynamicFilterInfo { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("DynamicFilterInfo") + .field("columns", &self.columns) + .field("build_side_names", &self.build_side_names) + .field("inner", &"") + .finish() + } +} diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 32267b118193..70e26083854f 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -24,6 +24,7 @@ use std::sync::Arc; use std::task::Poll; use std::{any::Any, vec}; +use super::dynamic_filters::{DynamicFilterInfo, PartitionedDynamicFilterInfo}; use super::utils::asymmetric_join_output_partitioning; use super::{ utils::{OnceAsync, OnceFut}, @@ -339,6 +340,8 @@ pub struct HashJoinExec { pub null_equals_null: bool, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// The dynamic filter which should be pushed to probe side + pub dynamic_filters_pushdown: Option>, } impl HashJoinExec { @@ -400,6 +403,7 @@ impl HashJoinExec { column_indices, null_equals_null, cache, + dynamic_filters_pushdown: None, }) } @@ -614,6 +618,27 @@ impl ExecutionPlan for HashJoinExec { self } + fn support_dynamic_filter(&self) -> bool { + true + } + + fn with_dynamic_filter( + &self, + dynamic_filter_info: Option>, + ) -> Result>> { + let mut plan = HashJoinExec::try_new( + Arc::clone(&self.left), + Arc::clone(&self.right), + self.on.clone(), + self.filter.clone(), + &self.join_type, + self.projection.clone(), + self.mode, + self.null_equals_null, + )?; + plan.dynamic_filters_pushdown = dynamic_filter_info; + Ok(Some(Arc::new(plan))) + } fn properties(&self) -> &PlanProperties { &self.cache } @@ -670,7 +695,7 @@ impl ExecutionPlan for HashJoinExec { self: Arc, children: Vec>, ) -> Result> { - Ok(Arc::new(HashJoinExec::try_new( + let mut plan = HashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), self.on.clone(), @@ -679,7 +704,9 @@ impl ExecutionPlan for HashJoinExec { self.projection.clone(), self.mode, self.null_equals_null, - )?)) + )?; + plan.dynamic_filters_pushdown = self.dynamic_filters_pushdown.clone(); + Ok(Arc::new(plan)) } fn execute( @@ -697,6 +724,10 @@ impl ExecutionPlan for HashJoinExec { .iter() .map(|on| Arc::clone(&on.1)) .collect::>(); + // if there is dynamic filters, we add the counter + if let Some(dynamic_filters) = &self.dynamic_filters_pushdown { + dynamic_filters.add_count()? + } let left_partitions = self.left.output_partitioning().partition_count(); let right_partitions = self.right.output_partitioning().partition_count(); @@ -765,6 +796,16 @@ impl ExecutionPlan for HashJoinExec { None => self.column_indices.clone(), }; + let partitioned_dynamic_info = + self.dynamic_filters_pushdown + .as_ref() + .map(|dynamic_filters| { + PartitionedDynamicFilterInfo::new( + partition, + Arc::::clone(dynamic_filters), + ) + }); + Ok(Box::pin(HashJoinStream { schema: self.schema(), on_left, @@ -781,6 +822,7 @@ impl ExecutionPlan for HashJoinExec { batch_size, hashes_buffer: vec![], right_side_ordered: self.right.output_ordering().is_some(), + dynamic_filter_info: partitioned_dynamic_info, })) } @@ -860,7 +902,6 @@ async fn collect_left_input( reservation.try_grow(estimated_hashtable_size)?; metrics.build_mem_used.add(estimated_hashtable_size); - let mut hashmap = JoinHashMap::with_capacity(num_rows); let mut hashes_buffer = Vec::new(); let mut offset = 0; @@ -884,7 +925,6 @@ async fn collect_left_input( } // Merge all batches into a single batch, so we can directly index into the arrays let single_batch = concat_batches(&schema, batches_iter)?; - // Reserve additional memory for visited indices bitmap and create shared builder let visited_indices_bitmap = if with_visited_indices_bitmap { let bitmap_size = bit_util::ceil(single_batch.num_rows(), 8); @@ -1102,6 +1142,8 @@ struct HashJoinStream { hashes_buffer: Vec, /// Specifies whether the right side has an ordering to potentially preserve right_side_ordered: bool, + /// dynamic filters after calculating the build sides + dynamic_filter_info: Option, } impl RecordBatchStream for HashJoinStream { @@ -1299,7 +1341,6 @@ impl HashJoinStream { } /// Collects build-side data by polling `OnceFut` future from initialized build-side - /// /// Updates build-side to `Ready`, and state to `FetchProbeSide` fn collect_build_side( &mut self, @@ -1313,10 +1354,20 @@ impl HashJoinStream { .left_fut .get_shared(cx))?; build_timer.done(); + // Merge the information to dynamic filters (if there is any) and check if it's finalized + let filter_finalized = if let Some(filter_info) = &self.dynamic_filter_info { + filter_info.merge_batch_and_check_finalized(left_data.batch())? + } else { + true // If there's no dynamic filter, we consider it as "finalized" + }; + // If the filter is not finalized after this merge, we need to wait + if !filter_finalized { + cx.waker().wake_by_ref(); + return Poll::Pending; + } self.state = HashJoinStreamState::FetchProbeBatch; self.build_side = BuildSide::Ready(BuildSideReadyState { left_data }); - Poll::Ready(Ok(StatefulStreamResult::Continue)) } diff --git a/datafusion/physical-plan/src/joins/mod.rs b/datafusion/physical-plan/src/joins/mod.rs index 6ddf19c51193..472631ffcfee 100644 --- a/datafusion/physical-plan/src/joins/mod.rs +++ b/datafusion/physical-plan/src/joins/mod.rs @@ -21,9 +21,11 @@ pub use cross_join::CrossJoinExec; pub use hash_join::HashJoinExec; pub use nested_loop_join::NestedLoopJoinExec; // Note: SortMergeJoin is not used in plans yet +pub use dynamic_filters::DynamicFilterInfo; pub use sort_merge_join::SortMergeJoinExec; pub use symmetric_hash_join::SymmetricHashJoinExec; mod cross_join; +mod dynamic_filters; mod hash_join; mod nested_loop_join; mod sort_merge_join; diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 0366c9fa5e46..0b777949b554 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -140,6 +140,18 @@ impl JoinHashMap { next: vec![0; capacity], } } + + /// extract all unique keys of this join hash map + pub fn extract_unique_keys(&self) -> HashSet { + let mut unique_keys = HashSet::new(); + unsafe { + self.map.iter().for_each(|entry| { + let (hash, _) = entry.as_ref(); + unique_keys.insert(hash.to_owned()); + }) + }; + unique_keys + } } // Type of offsets for obtaining indices from JoinHashMap. @@ -371,8 +383,48 @@ impl JoinHashMapType for JoinHashMap { } impl Debug for JoinHashMap { - fn fmt(&self, _f: &mut fmt::Formatter) -> fmt::Result { - Ok(()) + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + writeln!(f, "JoinHashMap {{")?; + writeln!(f, " map:")?; + writeln!(f, " ----------")?; + + let mut entries: Vec<_> = unsafe { self.map.iter().collect() }; + entries.sort_by_key(|bucket| unsafe { bucket.as_ref().0 }); + + for bucket in entries { + let mut indices = Vec::new(); + let mut curr_idx = unsafe { bucket.as_ref().1 }; + + while curr_idx > 0 { + indices.push(curr_idx - 1); + curr_idx = self.next[(curr_idx - 1) as usize]; + } + + indices.reverse(); + + writeln!( + f, + " | {:3} | {} | -> {:?}", + unsafe { bucket.as_ref().0 }, + unsafe { bucket.as_ref().1 }, + indices + )?; + } + + writeln!(f, " ----------")?; + writeln!(f, "\n next:")?; + writeln!(f, " ---------------------")?; + write!(f, " |")?; + for &next_idx in self.next.iter() { + write!(f, " {:2} |", next_idx)?; + } + writeln!(f)?; + write!(f, " |")?; + for i in 0..self.next.len() { + write!(f, " {:2} |", i)?; + } + writeln!(f, "\n ---------------------")?; + writeln!(f, "}}") } } diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 4712729bdaf5..c19927dceefe 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -263,6 +263,7 @@ impl MetricsSet { MetricValue::Gauge { name, .. } => name == metric_name, MetricValue::StartTimestamp(_) => false, MetricValue::EndTimestamp(_) => false, + MetricValue::DynamicFilter(_) => false, }) } diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 2eb01914ee0a..0a49713eb484 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -400,6 +400,9 @@ pub enum MetricValue { StartTimestamp(Timestamp), /// The time at which execution ended EndTimestamp(Timestamp), + + /// Dynamic filters + DynamicFilter(String), } impl MetricValue { @@ -417,6 +420,7 @@ impl MetricValue { Self::Time { name, .. } => name.borrow(), Self::StartTimestamp(_) => "start_timestamp", Self::EndTimestamp(_) => "end_timestamp", + Self::DynamicFilter(_) => "dynamic_filters", } } @@ -442,6 +446,7 @@ impl MetricValue { .and_then(|ts| ts.timestamp_nanos_opt()) .map(|nanos| nanos as usize) .unwrap_or(0), + Self::DynamicFilter(_) => 1, } } @@ -469,6 +474,7 @@ impl MetricValue { }, Self::StartTimestamp(_) => Self::StartTimestamp(Timestamp::new()), Self::EndTimestamp(_) => Self::EndTimestamp(Timestamp::new()), + Self::DynamicFilter(name) => Self::DynamicFilter(name.clone()), } } @@ -515,6 +521,7 @@ impl MetricValue { (Self::EndTimestamp(timestamp), Self::EndTimestamp(other_timestamp)) => { timestamp.update_to_max(other_timestamp); } + (Self::DynamicFilter(_), _) => {} m @ (_, _) => { panic!( "Mismatched metric types. Can not aggregate {:?} with value {:?}", @@ -539,6 +546,7 @@ impl MetricValue { Self::Time { .. } => 8, Self::StartTimestamp(_) => 9, // show timestamps last Self::EndTimestamp(_) => 10, + Self::DynamicFilter(_) => 11, } } @@ -574,6 +582,9 @@ impl Display for MetricValue { Self::StartTimestamp(timestamp) | Self::EndTimestamp(timestamp) => { write!(f, "{timestamp}") } + Self::DynamicFilter(filter) => { + write!(f, "{filter}") + } } } } diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index f3fee4f1fca6..2573d4ab4359 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -205,6 +205,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after optimize_projections TableScan: simple_explain_test projection=[a, b, c] +logical_plan after join_filter_pushdown SAME TEXT AS ABOVE logical_plan after eliminate_nested_union SAME TEXT AS ABOVE logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE @@ -230,6 +231,7 @@ logical_plan after unwrap_cast_in_comparison SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_group_by_constant SAME TEXT AS ABOVE logical_plan after optimize_projections SAME TEXT AS ABOVE +logical_plan after join_filter_pushdown SAME TEXT AS ABOVE logical_plan TableScan: simple_explain_test projection=[a, b, c] initial_physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true initial_physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] @@ -251,6 +253,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown SAME TEXT AS ABOVE physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true physical_plan_with_stats CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, statistics=[Rows=Absent, Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:)]] physical_plan_with_schema CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, c], has_header=true, schema=[a:Int32;N, b:Int32;N, c:Int32;N] @@ -327,6 +330,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] @@ -367,6 +371,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE physical_plan after ProjectionPushdown SAME TEXT AS ABOVE physical_plan after LimitPushdown ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE +physical_plan after JoinFilterPushdown SAME TEXT AS ABOVE physical_plan ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10 physical_plan_with_stats ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, statistics=[Rows=Exact(8), Bytes=Absent, [(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]] physical_plan_with_schema ParquetExec: file_groups={1 group: [[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, double_col:Float64;N, date_string_col:BinaryView;N, string_col:BinaryView;N, timestamp_col:Timestamp(Nanosecond, None);N] diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 84d18233d572..acad4a4f178e 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -225,6 +225,7 @@ datafusion.explain.show_sizes true datafusion.explain.show_statistics false datafusion.optimizer.allow_symmetric_joins_without_pruning true datafusion.optimizer.default_filter_selectivity 20 +datafusion.optimizer.dynamic_join_pushdown true datafusion.optimizer.enable_distinct_aggregation_soft_limit true datafusion.optimizer.enable_round_robin_repartition true datafusion.optimizer.enable_topk_aggregation true @@ -318,6 +319,7 @@ datafusion.explain.show_sizes true When set to true, the explain statement will datafusion.explain.show_statistics false When set to true, the explain statement will print operator statistics for physical plans datafusion.optimizer.allow_symmetric_joins_without_pruning true Should DataFusion allow symmetric hash joins for unbounded data sources even when its inputs do not have any ordering or filtering If the flag is not enabled, the SymmetricHashJoin operator will be unable to prune its internal buffers, resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, Right, RightAnti, and RightSemi - being produced only at the end of the execution. This is not typical in stream processing. Additionally, without proper design for long runner execution, all types of joins may encounter out-of-memory errors. datafusion.optimizer.default_filter_selectivity 20 The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). +datafusion.optimizer.dynamic_join_pushdown true when set to true, datafusion would try to push the build side statistic to probe phase datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to true, the optimizer will push a limit operation into grouped aggregations which have no aggregate expressions, as a soft limit, emitting groups once the limit is reached, before all rows in the group are read. datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores datafusion.optimizer.enable_topk_aggregation true When set to true, the optimizer will attempt to perform limit operations during aggregations, if possible diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6a49fda668a9..ebcd2f806a90 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -115,6 +115,7 @@ Environment variables are read during `SessionConfig` initialisation so they mus | datafusion.optimizer.default_filter_selectivity | 20 | The default filter selectivity used by Filter Statistics when an exact selectivity cannot be determined. Valid values are between 0 (no selectivity) and 100 (all rows are selected). | | datafusion.optimizer.prefer_existing_union | false | When set to true, the optimizer will not attempt to convert Union to Interleave | | datafusion.optimizer.expand_views_at_output | false | When set to true, if the returned type is a view type then the output will be coerced to a non-view. Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`. | +| datafusion.optimizer.dynamic_join_pushdown | true | when set to true, datafusion would try to push the build side statistic to probe phase | | datafusion.explain.logical_plan_only | false | When set to true, the explain statement will only print logical plans | | datafusion.explain.physical_plan_only | false | When set to true, the explain statement will only print physical plans | | datafusion.explain.show_statistics | false | When set to true, the explain statement will print operator statistics for physical plans |