Skip to content

Commit 958cb3c

Browse files
committed
[PR SNAPSHOT] Fix UNION field nullability tracking
This cherry pick a snapshot of a PR offered upstream. ------------------------------------------------------ This commit fixes two bugs related to UNION handling - when constructing union plan nullability of the other union branch was ignored, thus resulting field could easily have incorrect nullability - when pruning/simplifying projects, in `recompute_schema` function there was similar logic, thus loosing nullability information even for correctly constructed Union plan node As a result, other optimizer logic (e.g. `expr_simplifier.rs`) could draw incorrect conclusions and thus lead to incorrect query results, as demonstrated with the attached SLT test.
1 parent c456926 commit 958cb3c

File tree

3 files changed

+126
-31
lines changed

3 files changed

+126
-31
lines changed

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,7 @@ use datafusion_common::file_options::file_type::FileType;
5454
use datafusion_common::{
5555
exec_err, get_target_functional_dependencies, internal_err, not_impl_err,
5656
plan_datafusion_err, plan_err, Column, DFSchema, DFSchemaRef, DataFusionError,
57-
FunctionalDependencies, Result, ScalarValue, TableReference, ToDFSchema,
58-
UnnestOptions,
57+
Result, ScalarValue, TableReference, ToDFSchema, UnnestOptions,
5958
};
6059
use datafusion_expr_common::type_coercion::binary::type_union_resolution;
6160
use indexmap::IndexSet;
@@ -1508,27 +1507,10 @@ pub fn validate_unique_names<'a>(
15081507
/// [`TypeCoercionRewriter::coerce_union`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/struct.TypeCoercionRewriter.html#method.coerce_union
15091508
/// [`coerce_union_schema`]: https://docs.rs/datafusion-optimizer/latest/datafusion_optimizer/analyzer/type_coercion/fn.coerce_union_schema.html
15101509
pub fn union(left_plan: LogicalPlan, right_plan: LogicalPlan) -> Result<LogicalPlan> {
1511-
if left_plan.schema().fields().len() != right_plan.schema().fields().len() {
1512-
return plan_err!(
1513-
"UNION queries have different number of columns: \
1514-
left has {} columns whereas right has {} columns",
1515-
left_plan.schema().fields().len(),
1516-
right_plan.schema().fields().len()
1517-
);
1518-
}
1519-
1520-
// Temporarily use the schema from the left input and later rely on the analyzer to
1521-
// coerce the two schemas into a common one.
1522-
1523-
// Functional Dependencies doesn't preserve after UNION operation
1524-
let schema = (**left_plan.schema()).clone();
1525-
let schema =
1526-
Arc::new(schema.with_functional_dependencies(FunctionalDependencies::empty())?);
1527-
1528-
Ok(LogicalPlan::Union(Union {
1529-
inputs: vec![Arc::new(left_plan), Arc::new(right_plan)],
1530-
schema,
1531-
}))
1510+
Ok(LogicalPlan::Union(Union::try_new_with_loose_types(vec![
1511+
Arc::new(left_plan),
1512+
Arc::new(right_plan),
1513+
])?))
15321514
}
15331515

15341516
/// Create Projection

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 106 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -685,15 +685,13 @@ impl LogicalPlan {
685685
}))
686686
}
687687
LogicalPlan::Union(Union { inputs, schema }) => {
688-
let input_schema = inputs[0].schema();
689-
// If inputs are not pruned do not change schema
690-
// TODO this seems wrong (shouldn't we always use the schema of the input?)
691-
let schema = if schema.fields().len() == input_schema.fields().len() {
692-
Arc::clone(&schema)
688+
let first_input_schema = inputs[0].schema();
689+
if schema.fields().len() == first_input_schema.fields().len() {
690+
// If inputs are not pruned do not change schema
691+
Ok(LogicalPlan::Union(Union { inputs, schema }))
693692
} else {
694-
Arc::clone(input_schema)
695-
};
696-
Ok(LogicalPlan::Union(Union { inputs, schema }))
693+
Ok(LogicalPlan::Union(Union::try_new(inputs)?))
694+
}
697695
}
698696
LogicalPlan::Distinct(distinct) => {
699697
let distinct = match distinct {
@@ -2598,6 +2596,106 @@ pub struct Union {
25982596
pub schema: DFSchemaRef,
25992597
}
26002598

2599+
impl Union {
2600+
/// Constructs new Union instance deriving schema from inputs.
2601+
fn try_new(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2602+
let schema = Self::derive_schema_from_inputs(&inputs, false)?;
2603+
Ok(Union { inputs, schema })
2604+
}
2605+
2606+
/// Constructs new Union instance deriving schema from inputs.
2607+
/// Inputs do not have to have matching types and produced schema will
2608+
/// take type from the first input.
2609+
pub fn try_new_with_loose_types(inputs: Vec<Arc<LogicalPlan>>) -> Result<Self> {
2610+
let schema = Self::derive_schema_from_inputs(&inputs, true)?;
2611+
Ok(Union { inputs, schema })
2612+
}
2613+
2614+
/// Constructs new Union instance deriving schema from inputs.
2615+
///
2616+
/// `loose_types` if true, inputs do not have to have matching types and produced schema will
2617+
/// take type from the first input. TODO this is not necessarily reasonable behavior.
2618+
fn derive_schema_from_inputs(
2619+
inputs: &[Arc<LogicalPlan>],
2620+
loose_types: bool,
2621+
) -> Result<DFSchemaRef> {
2622+
if inputs.len() < 2 {
2623+
return plan_err!("UNION requires at least two inputs");
2624+
}
2625+
let first_schema = inputs[0].schema();
2626+
let fields_count = first_schema.fields().len();
2627+
for input in inputs.iter().skip(1) {
2628+
if fields_count != input.schema().fields().len() {
2629+
return plan_err!(
2630+
"UNION queries have different number of columns: \
2631+
left has {} columns whereas right has {} columns",
2632+
fields_count,
2633+
input.schema().fields().len()
2634+
);
2635+
}
2636+
}
2637+
2638+
let union_fields = (0..fields_count)
2639+
.map(|i| {
2640+
let fields = inputs
2641+
.iter()
2642+
.map(|input| input.schema().field(i))
2643+
.collect::<Vec<_>>();
2644+
let first_field = fields[0];
2645+
let name = first_field.name();
2646+
let data_type = if loose_types {
2647+
// TODO apply type coercion here, or document why it's better to defer
2648+
// temporarily use the data type from the left input and later rely on the analyzer to
2649+
// coerce the two schemas into a common one.
2650+
first_field.data_type()
2651+
} else {
2652+
fields.iter().skip(1).try_fold(
2653+
first_field.data_type(),
2654+
|acc, field| {
2655+
if acc != field.data_type() {
2656+
return plan_err!(
2657+
"UNION field {i} have different type in inputs: \
2658+
left has {} whereas right has {}",
2659+
first_field.data_type(),
2660+
field.data_type()
2661+
);
2662+
}
2663+
Ok(acc)
2664+
},
2665+
)?
2666+
};
2667+
let nullable = fields.iter().any(|field| field.is_nullable());
2668+
let mut field = Field::new(name, data_type.clone(), nullable);
2669+
let field_metadata =
2670+
intersect_maps(fields.iter().map(|field| field.metadata()));
2671+
field.set_metadata(field_metadata);
2672+
// TODO reusing table reference from the first schema is probably wrong
2673+
let table_reference = first_schema.qualified_field(i).0.cloned();
2674+
Ok((table_reference, Arc::new(field)))
2675+
})
2676+
.collect::<Result<_>>()?;
2677+
let union_schema_metadata =
2678+
intersect_maps(inputs.iter().map(|input| input.schema().metadata()));
2679+
2680+
// Functional Dependencies doesn't preserve after UNION operation
2681+
let schema = DFSchema::new_with_metadata(union_fields, union_schema_metadata)?;
2682+
let schema = Arc::new(schema);
2683+
2684+
Ok(schema)
2685+
}
2686+
}
2687+
2688+
fn intersect_maps<'a>(
2689+
inputs: impl IntoIterator<Item = &'a HashMap<String, String>>,
2690+
) -> HashMap<String, String> {
2691+
let mut inputs = inputs.into_iter();
2692+
let mut merged: HashMap<String, String> = inputs.next().cloned().unwrap_or_default();
2693+
for input in inputs {
2694+
merged.retain(|k, v| input.get(k) == Some(v));
2695+
}
2696+
merged
2697+
}
2698+
26012699
// Manual implementation needed because of `schema` field. Comparison excludes this field.
26022700
impl PartialOrd for Union {
26032701
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {

datafusion/sqllogictest/test_files/union.slt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,3 +761,18 @@ SELECT NULL WHERE FALSE;
761761
----
762762
0.5
763763
1
764+
765+
# test for https://github.com/apache/datafusion/issues/14352
766+
query TB rowsort
767+
SELECT
768+
a,
769+
a IS NOT NULL
770+
FROM (
771+
-- second column, even though it's not selected, was necessary to reproduce the bug linked above
772+
SELECT 'foo' AS a, 3 AS b
773+
UNION ALL
774+
SELECT NULL AS a, 4 AS b
775+
)
776+
----
777+
NULL false
778+
foo true

0 commit comments

Comments
 (0)