@@ -692,15 +692,13 @@ impl LogicalPlan {
692
692
} ) )
693
693
}
694
694
LogicalPlan :: Union ( Union { inputs, schema } ) => {
695
- let input_schema = inputs[ 0 ] . schema ( ) ;
696
- // If inputs are not pruned do not change schema
697
- // TODO this seems wrong (shouldn't we always use the schema of the input?)
698
- let schema = if schema. fields ( ) . len ( ) == input_schema. fields ( ) . len ( ) {
699
- Arc :: clone ( & schema)
695
+ let first_input_schema = inputs[ 0 ] . schema ( ) ;
696
+ if schema. fields ( ) . len ( ) == first_input_schema. fields ( ) . len ( ) {
697
+ // If inputs are not pruned do not change schema
698
+ Ok ( LogicalPlan :: Union ( Union { inputs, schema } ) )
700
699
} else {
701
- Arc :: clone ( input_schema)
702
- } ;
703
- Ok ( LogicalPlan :: Union ( Union { inputs, schema } ) )
700
+ Ok ( LogicalPlan :: Union ( Union :: try_new ( inputs) ?) )
701
+ }
704
702
}
705
703
LogicalPlan :: Distinct ( distinct) => {
706
704
let distinct = match distinct {
@@ -2625,6 +2623,106 @@ pub struct Union {
2625
2623
pub schema : DFSchemaRef ,
2626
2624
}
2627
2625
2626
+ impl Union {
2627
+ /// Constructs new Union instance deriving schema from inputs.
2628
+ fn try_new ( inputs : Vec < Arc < LogicalPlan > > ) -> Result < Self > {
2629
+ let schema = Self :: derive_schema_from_inputs ( & inputs, false ) ?;
2630
+ Ok ( Union { inputs, schema } )
2631
+ }
2632
+
2633
+ /// Constructs new Union instance deriving schema from inputs.
2634
+ /// Inputs do not have to have matching types and produced schema will
2635
+ /// take type from the first input.
2636
+ pub fn try_new_with_loose_types ( inputs : Vec < Arc < LogicalPlan > > ) -> Result < Self > {
2637
+ let schema = Self :: derive_schema_from_inputs ( & inputs, true ) ?;
2638
+ Ok ( Union { inputs, schema } )
2639
+ }
2640
+
2641
+ /// Constructs new Union instance deriving schema from inputs.
2642
+ ///
2643
+ /// `loose_types` if true, inputs do not have to have matching types and produced schema will
2644
+ /// take type from the first input. TODO this is not necessarily reasonable behavior.
2645
+ fn derive_schema_from_inputs (
2646
+ inputs : & [ Arc < LogicalPlan > ] ,
2647
+ loose_types : bool ,
2648
+ ) -> Result < DFSchemaRef > {
2649
+ if inputs. len ( ) < 2 {
2650
+ return plan_err ! ( "UNION requires at least two inputs" ) ;
2651
+ }
2652
+ let first_schema = inputs[ 0 ] . schema ( ) ;
2653
+ let fields_count = first_schema. fields ( ) . len ( ) ;
2654
+ for input in inputs. iter ( ) . skip ( 1 ) {
2655
+ if fields_count != input. schema ( ) . fields ( ) . len ( ) {
2656
+ return plan_err ! (
2657
+ "UNION queries have different number of columns: \
2658
+ left has {} columns whereas right has {} columns",
2659
+ fields_count,
2660
+ input. schema( ) . fields( ) . len( )
2661
+ ) ;
2662
+ }
2663
+ }
2664
+
2665
+ let union_fields = ( 0 ..fields_count)
2666
+ . map ( |i| {
2667
+ let fields = inputs
2668
+ . iter ( )
2669
+ . map ( |input| input. schema ( ) . field ( i) )
2670
+ . collect :: < Vec < _ > > ( ) ;
2671
+ let first_field = fields[ 0 ] ;
2672
+ let name = first_field. name ( ) ;
2673
+ let data_type = if loose_types {
2674
+ // TODO apply type coercion here, or document why it's better to defer
2675
+ // temporarily use the data type from the left input and later rely on the analyzer to
2676
+ // coerce the two schemas into a common one.
2677
+ first_field. data_type ( )
2678
+ } else {
2679
+ fields. iter ( ) . skip ( 1 ) . try_fold (
2680
+ first_field. data_type ( ) ,
2681
+ |acc, field| {
2682
+ if acc != field. data_type ( ) {
2683
+ return plan_err ! (
2684
+ "UNION field {i} have different type in inputs: \
2685
+ left has {} whereas right has {}",
2686
+ first_field. data_type( ) ,
2687
+ field. data_type( )
2688
+ ) ;
2689
+ }
2690
+ Ok ( acc)
2691
+ } ,
2692
+ ) ?
2693
+ } ;
2694
+ let nullable = fields. iter ( ) . any ( |field| field. is_nullable ( ) ) ;
2695
+ let mut field = Field :: new ( name, data_type. clone ( ) , nullable) ;
2696
+ let field_metadata =
2697
+ intersect_maps ( fields. iter ( ) . map ( |field| field. metadata ( ) ) ) ;
2698
+ field. set_metadata ( field_metadata) ;
2699
+ // TODO reusing table reference from the first schema is probably wrong
2700
+ let table_reference = first_schema. qualified_field ( i) . 0 . cloned ( ) ;
2701
+ Ok ( ( table_reference, Arc :: new ( field) ) )
2702
+ } )
2703
+ . collect :: < Result < _ > > ( ) ?;
2704
+ let union_schema_metadata =
2705
+ intersect_maps ( inputs. iter ( ) . map ( |input| input. schema ( ) . metadata ( ) ) ) ;
2706
+
2707
+ // Functional Dependencies doesn't preserve after UNION operation
2708
+ let schema = DFSchema :: new_with_metadata ( union_fields, union_schema_metadata) ?;
2709
+ let schema = Arc :: new ( schema) ;
2710
+
2711
+ Ok ( schema)
2712
+ }
2713
+ }
2714
+
2715
+ fn intersect_maps < ' a > (
2716
+ inputs : impl IntoIterator < Item = & ' a HashMap < String , String > > ,
2717
+ ) -> HashMap < String , String > {
2718
+ let mut inputs = inputs. into_iter ( ) ;
2719
+ let mut merged: HashMap < String , String > = inputs. next ( ) . cloned ( ) . unwrap_or_default ( ) ;
2720
+ for input in inputs {
2721
+ merged. retain ( |k, v| input. get ( k) == Some ( v) ) ;
2722
+ }
2723
+ merged
2724
+ }
2725
+
2628
2726
// Manual implementation needed because of `schema` field. Comparison excludes this field.
2629
2727
impl PartialOrd for Union {
2630
2728
fn partial_cmp ( & self , other : & Self ) -> Option < Ordering > {
0 commit comments