@@ -21,7 +21,9 @@ use crate::{OptimizerConfig, OptimizerRule};
21
21
use datafusion_common:: tree_node:: Transformed ;
22
22
use datafusion_common:: Result ;
23
23
use datafusion_expr:: expr_rewriter:: coerce_plan_expr_for_schema;
24
+ use datafusion_expr:: logical_plan:: tree_node:: unwrap_arc;
24
25
use datafusion_expr:: { Distinct , LogicalPlan , Union } ;
26
+ use itertools:: Itertools ;
25
27
use std:: sync:: Arc ;
26
28
27
29
#[ derive( Default ) ]
@@ -56,53 +58,55 @@ impl OptimizerRule for EliminateNestedUnion {
56
58
match plan {
57
59
LogicalPlan :: Union ( Union { inputs, schema } ) => {
58
60
let inputs = inputs
59
- . iter ( )
61
+ . into_iter ( )
60
62
. flat_map ( extract_plans_from_union)
61
63
. collect :: < Vec < _ > > ( ) ;
62
64
63
65
Ok ( Transformed :: yes ( LogicalPlan :: Union ( Union {
64
- inputs,
66
+ inputs : inputs . into_iter ( ) . map ( Arc :: new ) . collect_vec ( ) ,
65
67
schema,
66
68
} ) ) )
67
69
}
68
- LogicalPlan :: Distinct ( Distinct :: All ( ref nested_plan) ) => {
69
- match nested_plan . as_ref ( ) {
70
+ LogicalPlan :: Distinct ( Distinct :: All ( nested_plan) ) => {
71
+ match unwrap_arc ( nested_plan ) {
70
72
LogicalPlan :: Union ( Union { inputs, schema } ) => {
71
73
let inputs = inputs
72
- . iter ( )
74
+ . into_iter ( )
73
75
. map ( extract_plan_from_distinct)
74
76
. flat_map ( extract_plans_from_union)
75
77
. collect :: < Vec < _ > > ( ) ;
76
78
77
79
Ok ( Transformed :: yes ( LogicalPlan :: Distinct ( Distinct :: All (
78
80
Arc :: new ( LogicalPlan :: Union ( Union {
79
- inputs,
81
+ inputs : inputs . into_iter ( ) . map ( Arc :: new ) . collect_vec ( ) ,
80
82
schema : schema. clone ( ) ,
81
83
} ) ) ,
82
84
) ) ) )
83
85
}
84
- _ => Ok ( Transformed :: no ( plan) ) ,
86
+ nested_plan => Ok ( Transformed :: no ( LogicalPlan :: Distinct (
87
+ Distinct :: All ( Arc :: new ( nested_plan) ) ,
88
+ ) ) ) ,
85
89
}
86
90
}
87
91
_ => Ok ( Transformed :: no ( plan) ) ,
88
92
}
89
93
}
90
94
}
91
95
92
- fn extract_plans_from_union ( plan : & Arc < LogicalPlan > ) -> Vec < Arc < LogicalPlan > > {
93
- match plan . as_ref ( ) {
96
+ fn extract_plans_from_union ( plan : Arc < LogicalPlan > ) -> Vec < LogicalPlan > {
97
+ match unwrap_arc ( plan ) {
94
98
LogicalPlan :: Union ( Union { inputs, schema } ) => inputs
95
- . iter ( )
96
- . map ( |plan| Arc :: new ( coerce_plan_expr_for_schema ( plan, schema) . unwrap ( ) ) )
99
+ . into_iter ( )
100
+ . map ( |plan| coerce_plan_expr_for_schema ( & plan, & schema) . unwrap ( ) )
97
101
. collect :: < Vec < _ > > ( ) ,
98
- _ => vec ! [ plan. clone ( ) ] ,
102
+ plan => vec ! [ plan] ,
99
103
}
100
104
}
101
105
102
- fn extract_plan_from_distinct ( plan : & Arc < LogicalPlan > ) -> & Arc < LogicalPlan > {
103
- match plan . as_ref ( ) {
106
+ fn extract_plan_from_distinct ( plan : Arc < LogicalPlan > ) -> Arc < LogicalPlan > {
107
+ match unwrap_arc ( plan ) {
104
108
LogicalPlan :: Distinct ( Distinct :: All ( plan) ) => plan,
105
- _ => plan,
109
+ plan => Arc :: new ( plan) ,
106
110
}
107
111
}
108
112
0 commit comments