18
18
//! [`ExtractEquijoinPredicate`] identifies equality join (equijoin) predicates
19
19
use crate :: optimizer:: ApplyOrder ;
20
20
use crate :: { OptimizerConfig , OptimizerRule } ;
21
- use datafusion_common:: DFSchema ;
21
+ use datafusion_common:: tree_node :: Transformed ;
22
22
use datafusion_common:: Result ;
23
- use datafusion_expr:: utils:: { can_hash, find_valid_equijoin_key_pair, split_conjunction} ;
23
+ use datafusion_common:: { internal_err, DFSchema } ;
24
+ use datafusion_expr:: utils:: split_conjunction_owned;
25
+ use datafusion_expr:: utils:: { can_hash, find_valid_equijoin_key_pair} ;
24
26
use datafusion_expr:: { BinaryExpr , Expr , ExprSchemable , Join , LogicalPlan , Operator } ;
25
27
use std:: sync:: Arc ;
26
-
27
28
// equijoin predicate
28
29
type EquijoinPredicate = ( Expr , Expr ) ;
29
30
@@ -51,82 +52,90 @@ impl ExtractEquijoinPredicate {
51
52
impl OptimizerRule for ExtractEquijoinPredicate {
52
53
fn try_optimize (
53
54
& self ,
54
- plan : & LogicalPlan ,
55
+ _plan : & LogicalPlan ,
55
56
_config : & dyn OptimizerConfig ,
56
57
) -> Result < Option < LogicalPlan > > {
58
+ internal_err ! ( "Should have called ExtractEquijoinPredicate::rewrite" )
59
+ }
60
+ fn supports_rewrite ( & self ) -> bool {
61
+ true
62
+ }
63
+
64
+ fn name ( & self ) -> & str {
65
+ "extract_equijoin_predicate"
66
+ }
67
+
68
+ fn apply_order ( & self ) -> Option < ApplyOrder > {
69
+ Some ( ApplyOrder :: BottomUp )
70
+ }
71
+
72
+ fn rewrite (
73
+ & self ,
74
+ plan : LogicalPlan ,
75
+ _config : & dyn OptimizerConfig ,
76
+ ) -> Result < Transformed < LogicalPlan > > {
57
77
match plan {
58
78
LogicalPlan :: Join ( Join {
59
79
left,
60
80
right,
61
- on,
62
- filter,
81
+ mut on,
82
+ filter : Some ( expr ) ,
63
83
join_type,
64
84
join_constraint,
65
85
schema,
66
86
null_equals_null,
67
87
} ) => {
68
88
let left_schema = left. schema ( ) ;
69
89
let right_schema = right. schema ( ) ;
70
-
71
- filter . as_ref ( ) . map_or ( Result :: Ok ( None ) , |expr| {
72
- let ( equijoin_predicates , non_equijoin_expr ) =
73
- split_eq_and_noneq_join_predicate (
74
- expr ,
75
- left_schema ,
76
- right_schema ,
77
- ) ? ;
78
-
79
- let optimized_plan = ( !equijoin_predicates . is_empty ( ) ) . then ( || {
80
- let mut new_on = on . clone ( ) ;
81
- new_on . extend ( equijoin_predicates ) ;
82
-
83
- LogicalPlan :: Join ( Join {
84
- left : left . clone ( ) ,
85
- right : right . clone ( ) ,
86
- on : new_on ,
87
- filter : non_equijoin_expr ,
88
- join_type : * join_type ,
89
- join_constraint : * join_constraint ,
90
- schema : schema . clone ( ) ,
91
- null_equals_null : * null_equals_null ,
92
- } )
93
- } ) ;
94
-
95
- Ok ( optimized_plan )
96
- } )
90
+ let ( equijoin_predicates , non_equijoin_expr ) =
91
+ split_eq_and_noneq_join_predicate ( expr , left_schema , right_schema ) ? ;
92
+
93
+ if !equijoin_predicates . is_empty ( ) {
94
+ on . extend ( equijoin_predicates ) ;
95
+ Ok ( Transformed :: yes ( LogicalPlan :: Join ( Join {
96
+ left ,
97
+ right ,
98
+ on ,
99
+ filter : non_equijoin_expr ,
100
+ join_type ,
101
+ join_constraint ,
102
+ schema ,
103
+ null_equals_null ,
104
+ } ) ) )
105
+ } else {
106
+ Ok ( Transformed :: no ( LogicalPlan :: Join ( Join {
107
+ left ,
108
+ right ,
109
+ on ,
110
+ filter : non_equijoin_expr ,
111
+ join_type ,
112
+ join_constraint ,
113
+ schema ,
114
+ null_equals_null ,
115
+ } ) ) )
116
+ }
97
117
}
98
- _ => Ok ( None ) ,
118
+ _ => Ok ( Transformed :: no ( plan ) ) ,
99
119
}
100
120
}
101
-
102
- fn name ( & self ) -> & str {
103
- "extract_equijoin_predicate"
104
- }
105
-
106
- fn apply_order ( & self ) -> Option < ApplyOrder > {
107
- Some ( ApplyOrder :: BottomUp )
108
- }
109
121
}
110
122
111
123
fn split_eq_and_noneq_join_predicate (
112
- filter : & Expr ,
124
+ filter : Expr ,
113
125
left_schema : & Arc < DFSchema > ,
114
126
right_schema : & Arc < DFSchema > ,
115
127
) -> Result < ( Vec < EquijoinPredicate > , Option < Expr > ) > {
116
- let exprs = split_conjunction ( filter) ;
128
+ let exprs = split_conjunction_owned ( filter) ;
117
129
118
130
let mut accum_join_keys: Vec < ( Expr , Expr ) > = vec ! [ ] ;
119
131
let mut accum_filters: Vec < Expr > = vec ! [ ] ;
120
132
for expr in exprs {
121
133
match expr {
122
134
Expr :: BinaryExpr ( BinaryExpr {
123
- left,
135
+ ref left,
124
136
op : Operator :: Eq ,
125
- right,
137
+ ref right,
126
138
} ) => {
127
- let left = left. as_ref ( ) ;
128
- let right = right. as_ref ( ) ;
129
-
130
139
let join_key_pair = find_valid_equijoin_key_pair (
131
140
left,
132
141
right,
@@ -141,13 +150,13 @@ fn split_eq_and_noneq_join_predicate(
141
150
if can_hash ( & left_expr_type) && can_hash ( & right_expr_type) {
142
151
accum_join_keys. push ( ( left_expr, right_expr) ) ;
143
152
} else {
144
- accum_filters. push ( expr. clone ( ) ) ;
153
+ accum_filters. push ( expr) ;
145
154
}
146
155
} else {
147
- accum_filters. push ( expr. clone ( ) ) ;
156
+ accum_filters. push ( expr) ;
148
157
}
149
158
}
150
- _ => accum_filters. push ( expr. clone ( ) ) ,
159
+ _ => accum_filters. push ( expr) ,
151
160
}
152
161
}
153
162
0 commit comments