19
19
20
20
use crate :: optimizer:: ApplyOrder ;
21
21
use crate :: { OptimizerConfig , OptimizerRule } ;
22
+ use datafusion_common:: internal_err;
23
+ use datafusion_common:: tree_node:: Transformed ;
22
24
use datafusion_common:: Result ;
23
25
use datafusion_expr:: expr:: BinaryExpr ;
24
26
use datafusion_expr:: logical_plan:: Filter ;
@@ -56,29 +58,34 @@ use datafusion_expr::{Expr, LogicalPlan, Operator};
56
58
///
57
59
/// ```sql
58
60
/// where
59
- /// p_partkey = l_partkey
60
- /// and l_shipmode in (‘AIR’, ‘AIR REG’)
61
- /// and l_shipinstruct = ‘DELIVER IN PERSON’
62
- /// and (
63
61
/// (
62
+ /// p_partkey = l_partkey
64
63
/// and p_brand = ‘[BRAND1]’
65
64
/// and p_container in ( ‘SM CASE’, ‘SM BOX’, ‘SM PACK’, ‘SM PKG’)
66
65
/// and l_quantity >= [QUANTITY1] and l_quantity <= [QUANTITY1] + 10
67
66
/// and p_size between 1 and 5
67
+ /// and l_shipmode in (‘AIR’, ‘AIR REG’)
68
+ /// and l_shipinstruct = ‘DELIVER IN PERSON’
68
69
/// )
69
70
/// or
70
71
/// (
72
+ /// p_partkey = l_partkey
71
73
/// and p_brand = ‘[BRAND2]’
72
74
/// and p_container in (‘MED BAG’, ‘MED BOX’, ‘MED PKG’, ‘MED PACK’)
73
75
/// and l_quantity >= [QUANTITY2] and l_quantity <= [QUANTITY2] + 10
74
76
/// and p_size between 1 and 10
77
+ /// and l_shipmode in (‘AIR’, ‘AIR REG’)
78
+ /// and l_shipinstruct = ‘DELIVER IN PERSON’
75
79
/// )
76
80
/// or
77
81
/// (
82
+ /// p_partkey = l_partkey
78
83
/// and p_brand = ‘[BRAND3]’
79
84
/// and p_container in ( ‘LG CASE’, ‘LG BOX’, ‘LG PACK’, ‘LG PKG’)
80
85
/// and l_quantity >= [QUANTITY3] and l_quantity <= [QUANTITY3] + 10
81
86
/// and p_size between 1 and 15
87
+ /// and l_shipmode in (‘AIR’, ‘AIR REG’)
88
+ /// and l_shipinstruct = ‘DELIVER IN PERSON’
82
89
/// )
83
90
/// )
84
91
/// ```
@@ -128,21 +135,10 @@ impl RewriteDisjunctivePredicate {
128
135
impl OptimizerRule for RewriteDisjunctivePredicate {
129
136
fn try_optimize (
130
137
& self ,
131
- plan : & LogicalPlan ,
138
+ _plan : & LogicalPlan ,
132
139
_config : & dyn OptimizerConfig ,
133
140
) -> Result < Option < LogicalPlan > > {
134
- match plan {
135
- LogicalPlan :: Filter ( filter) => {
136
- let predicate = predicate ( & filter. predicate ) ?;
137
- let rewritten_predicate = rewrite_predicate ( predicate) ;
138
- let rewritten_expr = normalize_predicate ( rewritten_predicate) ;
139
- Ok ( Some ( LogicalPlan :: Filter ( Filter :: try_new (
140
- rewritten_expr,
141
- filter. input . clone ( ) ,
142
- ) ?) ) )
143
- }
144
- _ => Ok ( None ) ,
145
- }
141
+ internal_err ! ( "Should have called RewriteDisjunctivePredicate::rewrite" )
146
142
}
147
143
148
144
fn name ( & self ) -> & str {
@@ -152,6 +148,29 @@ impl OptimizerRule for RewriteDisjunctivePredicate {
152
148
fn apply_order ( & self ) -> Option < ApplyOrder > {
153
149
Some ( ApplyOrder :: TopDown )
154
150
}
151
+
152
+ fn supports_rewrite ( & self ) -> bool {
153
+ true
154
+ }
155
+
156
+ fn rewrite (
157
+ & self ,
158
+ plan : LogicalPlan ,
159
+ _config : & dyn OptimizerConfig ,
160
+ ) -> Result < Transformed < LogicalPlan > > {
161
+ match plan {
162
+ LogicalPlan :: Filter ( filter) => {
163
+ let predicate = predicate ( filter. predicate ) ?;
164
+ let rewritten_predicate = rewrite_predicate ( predicate) ;
165
+ let rewritten_expr = normalize_predicate ( rewritten_predicate) ;
166
+ Ok ( Transformed :: yes ( LogicalPlan :: Filter ( Filter :: try_new (
167
+ rewritten_expr,
168
+ filter. input ,
169
+ ) ?) ) )
170
+ }
171
+ _ => Ok ( Transformed :: no ( plan) ) ,
172
+ }
173
+ }
155
174
}
156
175
157
176
#[ derive( Clone , PartialEq , Debug ) ]
@@ -161,27 +180,23 @@ enum Predicate {
161
180
Other { expr : Box < Expr > } ,
162
181
}
163
182
164
- fn predicate ( expr : & Expr ) -> Result < Predicate > {
183
+ fn predicate ( expr : Expr ) -> Result < Predicate > {
165
184
match expr {
166
185
Expr :: BinaryExpr ( BinaryExpr { left, op, right } ) => match op {
167
186
Operator :: And => {
168
- let args = vec ! [ predicate( left) ?, predicate( right) ?] ;
187
+ let args = vec ! [ predicate( * left) ?, predicate( * right) ?] ;
169
188
Ok ( Predicate :: And { args } )
170
189
}
171
190
Operator :: Or => {
172
- let args = vec ! [ predicate( left) ?, predicate( right) ?] ;
191
+ let args = vec ! [ predicate( * left) ?, predicate( * right) ?] ;
173
192
Ok ( Predicate :: Or { args } )
174
193
}
175
194
_ => Ok ( Predicate :: Other {
176
- expr : Box :: new ( Expr :: BinaryExpr ( BinaryExpr :: new (
177
- left. clone ( ) ,
178
- * op,
179
- right. clone ( ) ,
180
- ) ) ) ,
195
+ expr : Box :: new ( Expr :: BinaryExpr ( BinaryExpr :: new ( left, op, right) ) ) ,
181
196
} ) ,
182
197
} ,
183
198
_ => Ok ( Predicate :: Other {
184
- expr : Box :: new ( expr. clone ( ) ) ,
199
+ expr : Box :: new ( expr) ,
185
200
} ) ,
186
201
}
187
202
}
@@ -210,8 +225,8 @@ fn rewrite_predicate(predicate: Predicate) -> Predicate {
210
225
match predicate {
211
226
Predicate :: And { args } => {
212
227
let mut rewritten_args = Vec :: with_capacity ( args. len ( ) ) ;
213
- for arg in args. iter ( ) {
214
- rewritten_args. push ( rewrite_predicate ( arg. clone ( ) ) ) ;
228
+ for arg in args. into_iter ( ) {
229
+ rewritten_args. push ( rewrite_predicate ( arg) ) ;
215
230
}
216
231
rewritten_args = flatten_and_predicates ( rewritten_args) ;
217
232
Predicate :: And {
@@ -220,15 +235,13 @@ fn rewrite_predicate(predicate: Predicate) -> Predicate {
220
235
}
221
236
Predicate :: Or { args } => {
222
237
let mut rewritten_args = vec ! [ ] ;
223
- for arg in args. iter ( ) {
224
- rewritten_args. push ( rewrite_predicate ( arg. clone ( ) ) ) ;
238
+ for arg in args. into_iter ( ) {
239
+ rewritten_args. push ( rewrite_predicate ( arg) ) ;
225
240
}
226
241
rewritten_args = flatten_or_predicates ( rewritten_args) ;
227
- delete_duplicate_predicates ( & rewritten_args)
242
+ delete_duplicate_predicates ( rewritten_args)
228
243
}
229
- Predicate :: Other { expr } => Predicate :: Other {
230
- expr : Box :: new ( * expr) ,
231
- } ,
244
+ Predicate :: Other { expr } => Predicate :: Other { expr } ,
232
245
}
233
246
}
234
247
@@ -239,8 +252,7 @@ fn flatten_and_predicates(
239
252
for predicate in and_predicates {
240
253
match predicate {
241
254
Predicate :: And { args } => {
242
- flattened_predicates
243
- . extend_from_slice ( flatten_and_predicates ( args) . as_slice ( ) ) ;
255
+ flattened_predicates. append ( & mut flatten_and_predicates ( args) ) ;
244
256
}
245
257
_ => {
246
258
flattened_predicates. push ( predicate) ;
@@ -257,8 +269,7 @@ fn flatten_or_predicates(
257
269
for predicate in or_predicates {
258
270
match predicate {
259
271
Predicate :: Or { args } => {
260
- flattened_predicates
261
- . extend_from_slice ( flatten_or_predicates ( args) . as_slice ( ) ) ;
272
+ flattened_predicates. append ( & mut flatten_or_predicates ( args) ) ;
262
273
}
263
274
_ => {
264
275
flattened_predicates. push ( predicate) ;
@@ -268,7 +279,7 @@ fn flatten_or_predicates(
268
279
flattened_predicates
269
280
}
270
281
271
- fn delete_duplicate_predicates ( or_predicates : & [ Predicate ] ) -> Predicate {
282
+ fn delete_duplicate_predicates ( or_predicates : Vec < Predicate > ) -> Predicate {
272
283
let mut shortest_exprs: Vec < Predicate > = vec ! [ ] ;
273
284
let mut shortest_exprs_len = 0 ;
274
285
// choose the shortest AND predicate
@@ -305,31 +316,30 @@ fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
305
316
}
306
317
if exist_exprs. is_empty ( ) {
307
318
return Predicate :: Or {
308
- args : or_predicates. to_vec ( ) ,
319
+ args : or_predicates,
309
320
} ;
310
321
}
311
322
312
323
// Rebuild the OR predicate.
313
324
// (A AND B) OR A will be optimized to A.
314
325
let mut new_or_predicates = vec ! [ ] ;
315
- for or_predicate in or_predicates. iter ( ) {
326
+ for or_predicate in or_predicates. into_iter ( ) {
316
327
match or_predicate {
317
- Predicate :: And { args } => {
318
- let mut new_args = ( * args) . clone ( ) ;
319
- new_args. retain ( |expr| !exist_exprs. contains ( expr) ) ;
320
- if !new_args. is_empty ( ) {
321
- if new_args. len ( ) == 1 {
322
- new_or_predicates. push ( new_args[ 0 ] . clone ( ) ) ;
328
+ Predicate :: And { mut args } => {
329
+ args. retain ( |expr| !exist_exprs. contains ( expr) ) ;
330
+ if !args. is_empty ( ) {
331
+ if args. len ( ) == 1 {
332
+ new_or_predicates. push ( args. remove ( 0 ) ) ;
323
333
} else {
324
- new_or_predicates. push ( Predicate :: And { args : new_args } ) ;
334
+ new_or_predicates. push ( Predicate :: And { args } ) ;
325
335
}
326
336
} else {
327
337
new_or_predicates. clear ( ) ;
328
338
break ;
329
339
}
330
340
}
331
341
_ => {
332
- if exist_exprs. contains ( or_predicate) {
342
+ if exist_exprs. contains ( & or_predicate) {
333
343
new_or_predicates. clear ( ) ;
334
344
break ;
335
345
}
@@ -338,7 +348,7 @@ fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
338
348
}
339
349
if !new_or_predicates. is_empty ( ) {
340
350
if new_or_predicates. len ( ) == 1 {
341
- exist_exprs. push ( new_or_predicates[ 0 ] . clone ( ) ) ;
351
+ exist_exprs. push ( new_or_predicates. remove ( 0 ) ) ;
342
352
} else {
343
353
exist_exprs. push ( Predicate :: Or {
344
354
args : flatten_or_predicates ( new_or_predicates) ,
@@ -347,7 +357,7 @@ fn delete_duplicate_predicates(or_predicates: &[Predicate]) -> Predicate {
347
357
}
348
358
349
359
if exist_exprs. len ( ) == 1 {
350
- exist_exprs[ 0 ] . clone ( )
360
+ exist_exprs. remove ( 0 )
351
361
} else {
352
362
Predicate :: And {
353
363
args : flatten_and_predicates ( exist_exprs) ,
@@ -373,7 +383,7 @@ mod tests {
373
383
and ( equi_expr. clone ( ) , gt_expr. clone ( ) ) ,
374
384
and ( equi_expr. clone ( ) , lt_expr. clone ( ) ) ,
375
385
) ;
376
- let predicate = predicate ( & expr) ?;
386
+ let predicate = predicate ( expr) ?;
377
387
assert_eq ! (
378
388
predicate,
379
389
Predicate :: Or {
0 commit comments