@@ -785,12 +785,19 @@ fn rewrite_projection_given_requirements(
785
785
/// - `optimize_projections_preserve_existing_projections` optimizer config is false, and
786
786
/// - input schema of the projection, output schema of the projection are same, and
787
787
/// - all projection expressions are either Column or Literal
788
- fn is_projection_unnecessary ( input : & LogicalPlan , proj_exprs : & [ Expr ] ) -> Result < bool > {
789
- !config
788
+ fn is_projection_unnecessary (
789
+ input : & LogicalPlan ,
790
+ proj_exprs : & [ Expr ] ,
791
+ config : & dyn OptimizerConfig ,
792
+ ) -> Result < bool > {
793
+ if config
790
794
. options ( )
791
795
. optimizer
792
796
. optimize_projections_preserve_existing_projections
793
-
797
+ {
798
+ return Ok ( false ) ;
799
+ }
800
+
794
801
let proj_schema = projection_schema ( input, proj_exprs) ?;
795
802
Ok ( & proj_schema == input. schema ( ) && proj_exprs. iter ( ) . all ( is_expr_trivial) )
796
803
}
@@ -807,11 +814,12 @@ mod tests {
807
814
use crate :: optimize_projections:: OptimizeProjections ;
808
815
use crate :: optimizer:: Optimizer ;
809
816
use crate :: test:: {
810
- assert_fields_eq, assert_optimized_plan_eq, scan_empty , test_table_scan ,
811
- test_table_scan_fields, test_table_scan_with_name,
817
+ assert_fields_eq, assert_optimized_plan_eq, assert_optimized_plan_with_config_eq ,
818
+ scan_empty , test_table_scan , test_table_scan_fields, test_table_scan_with_name,
812
819
} ;
813
820
use crate :: { OptimizerContext , OptimizerRule } ;
814
821
use arrow:: datatypes:: { DataType , Field , Schema } ;
822
+ use datafusion_common:: config:: ConfigOptions ;
815
823
use datafusion_common:: {
816
824
Column , DFSchema , DFSchemaRef , JoinType , Result , TableReference ,
817
825
} ;
@@ -1998,4 +2006,49 @@ mod tests {
1998
2006
optimizer. optimize ( plan, & OptimizerContext :: new ( ) , observe) ?;
1999
2007
Ok ( optimized_plan)
2000
2008
}
2009
+
2010
+ #[ test]
2011
+ fn aggregate_filter_pushdown_preserve_projections ( ) -> Result < ( ) > {
2012
+ let table_scan = test_table_scan ( ) ?;
2013
+ let aggr_with_filter = count_udaf ( )
2014
+ . call ( vec ! [ col( "b" ) ] )
2015
+ . filter ( col ( "c" ) . gt ( lit ( 42 ) ) )
2016
+ . build ( ) ?;
2017
+ let plan = LogicalPlanBuilder :: from ( table_scan)
2018
+ . aggregate (
2019
+ vec ! [ col( "a" ) ] ,
2020
+ vec ! [ count( col( "b" ) ) , aggr_with_filter. alias( "count2" ) ] ,
2021
+ ) ?
2022
+ . project ( vec ! [ col( "a" ) , col( "count(test.b)" ) , col( "count2" ) ] ) ?
2023
+ . build ( ) ?;
2024
+
2025
+ let expected_default = "Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
2026
+ \n TableScan: test projection=[a, b, c]";
2027
+
2028
+ let expected_preserve_projections = "Projection: test.a, count(test.b), count2\
2029
+ \n Aggregate: groupBy=[[test.a]], aggr=[[count(test.b), count(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
2030
+ \n TableScan: test projection=[a, b, c]";
2031
+
2032
+ let scenarios = [
2033
+ ( false , expected_default) ,
2034
+ ( true , expected_preserve_projections) ,
2035
+ ] ;
2036
+
2037
+ for ( preserve_projections, expected_plan) in scenarios. into_iter ( ) {
2038
+ let mut config = ConfigOptions :: new ( ) ;
2039
+ config
2040
+ . optimizer
2041
+ . optimize_projections_preserve_existing_projections =
2042
+ preserve_projections;
2043
+ let optimizer_context = OptimizerContext :: new_with_options ( config) ;
2044
+ assert_optimized_plan_with_config_eq (
2045
+ Arc :: new ( OptimizeProjections :: new ( ) ) ,
2046
+ plan. clone ( ) ,
2047
+ expected_plan,
2048
+ & optimizer_context,
2049
+ ) ?;
2050
+ }
2051
+
2052
+ Ok ( ( ) )
2053
+ }
2001
2054
}
0 commit comments