diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c7cff3ac26b1..b0c28e145525 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType; use datafusion_common::{ exec_err, get_target_functional_dependencies, internal_err, not_impl_err, plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, - FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema, - UnnestOptions, + Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions, }; use datafusion_expr_common::type_coercion::binary::type_union_resolution; @@ -1518,27 +1517,10 @@ pub fn validate_unique_names<'a>( /// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union /// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result { - if left_plan.schema().fields().len() != right_plan.schema().fields().len() { - return plan_err!( - "UNION queries have different number of columns: \ - left has {} columns whereas right has {} columns", - left_plan.schema().fields().len(), - right_plan.schema().fields().len() - ); - } - - // Temporarily use the schema from the left input and later rely on the analyzer to - // coerce the two schemas into a common one. - - // Functional Dependencies doesn't preserve after UNION operation - let schema = (**left_plan.schema()).clone(); - let schema = - Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?); - - Ok(LogicalPlan::Union(Union { - inputs: vec![Arc::new(left_plan), Arc::new(right_plan)], - schema, - })) + Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![ + Arc::new(left_plan), + Arc::new(right_plan), + ])?)) } /// Create Projection diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 7e9c0cb75ec8..446ae94108b1 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -699,15 +699,13 @@ impl LogicalPlan { })) } LogicalPlan::Union(Union { inputs, schema }) => { - let input_schema = inputs[0].schema(); - // If inputs are not pruned do not change schema - // TODO this seems wrong (shouldn't we always use the schema of the input?) - let schema = if schema.fields().len() == input_schema.fields().len() { - Arc::clone(&schema) + let first_input_schema = inputs[0].schema(); + if schema.fields().len() == first_input_schema.fields().len() { + // If inputs are not pruned do not change schema + Ok(LogicalPlan::Union(Union { inputs, schema })) } else { - Arc::clone(input_schema) - }; - Ok(LogicalPlan::Union(Union { inputs, schema })) + Ok(LogicalPlan::Union(Union::try_new(inputs)?)) + } } LogicalPlan::Distinct(distinct) => { let distinct = match distinct { @@ -2645,6 +2643,107 @@ pub struct Union { pub schema: DFSchemaRef, } +impl Union { + /// Constructs new Union instance deriving schema from inputs. + fn try_new(inputs: Vec>) -> Result { + let schema = Self::derive_schema_from_inputs(&inputs, false)?; + Ok(Union { inputs, schema }) + } + + /// Constructs new Union instance deriving schema from inputs. + /// Inputs do not have to have matching types and produced schema will + /// take type from the first input. + // TODO (https://github.com/apache/datafusion/issues/14380): Avoid creating uncoerced union at all. + pub fn try_new_with_loose_types(inputs: Vec>) -> Result { + let schema = Self::derive_schema_from_inputs(&inputs, true)?; + Ok(Union { inputs, schema }) + } + + /// Constructs new Union instance deriving schema from inputs. + /// + /// `loose_types` if true, inputs do not have to have matching types and produced schema will + /// take type from the first input. TODO () this is not necessarily reasonable behavior. + fn derive_schema_from_inputs( + inputs: &[Arc], + loose_types: bool, + ) -> Result { + if inputs.len() < 2 { + return plan_err!("UNION requires at least two inputs"); + } + let first_schema = inputs[0].schema(); + let fields_count = first_schema.fields().len(); + for input in inputs.iter().skip(1) { + if fields_count != input.schema().fields().len() { + return plan_err!( + "UNION queries have different number of columns: \ + left has {} columns whereas right has {} columns", + fields_count, + input.schema().fields().len() + ); + } + } + + let union_fields = (0..fields_count) + .map(|i| { + let fields = inputs + .iter() + .map(|input| input.schema().field(i)) + .collect::>(); + let first_field = fields[0]; + let name = first_field.name(); + let data_type = if loose_types { + // TODO apply type coercion here, or document why it's better to defer + // temporarily use the data type from the left input and later rely on the analyzer to + // coerce the two schemas into a common one. + first_field.data_type() + } else { + fields.iter().skip(1).try_fold( + first_field.data_type(), + |acc, field| { + if acc != field.data_type() { + return plan_err!( + "UNION field {i} have different type in inputs: \ + left has {} whereas right has {}", + first_field.data_type(), + field.data_type() + ); + } + Ok(acc) + }, + )? + }; + let nullable = fields.iter().any(|field| field.is_nullable()); + let mut field = Field::new(name, data_type.clone(), nullable); + let field_metadata = + intersect_maps(fields.iter().map(|field| field.metadata())); + field.set_metadata(field_metadata); + // TODO reusing table reference from the first schema is probably wrong + let table_reference = first_schema.qualified_field(i).0.cloned(); + Ok((table_reference, Arc::new(field))) + }) + .collect::>()?; + let union_schema_metadata = + intersect_maps(inputs.iter().map(|input| input.schema().metadata())); + + // Functional Dependencies doesn't preserve after UNION operation + let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?; + let schema = Arc::new(schema); + + Ok(schema) + } +} + +fn intersect_maps<'a>( + inputs: impl IntoIterator>, +) -> HashMap { + let mut inputs = inputs.into_iter(); + let mut merged: HashMap = inputs.next().cloned().unwrap_or_default(); + for input in inputs { + merged.retain(|k, v| input.get(k) == Some(v)); + } + merged +} + // Manual implementation needed because of `schema` field. Comparison excludes this field. impl PartialOrd for Union { fn partial_cmp(&self, other: &Self) -> Option { diff --git a/datafusion/sqllogictest/test_files/union.slt b/datafusion/sqllogictest/test_files/union.slt index 352c01ca295c..cbd19bf3806f 100644 --- a/datafusion/sqllogictest/test_files/union.slt +++ b/datafusion/sqllogictest/test_files/union.slt @@ -836,3 +836,18 @@ physical_plan # Clean up after the test statement ok drop table aggregate_test_100; + +# test for https://github.com/apache/datafusion/issues/14352 +query TB rowsort +SELECT + a, + a IS NOT NULL +FROM ( + -- second column, even though it's not selected, was necessary to reproduce the bug linked above + SELECT 'foo' AS a, 3 AS b + UNION ALL + SELECT NULL AS a, 4 AS b +) +---- +NULL false +foo true