diff --git a/datafusion/physical-expr/src/equivalence/properties.rs b/datafusion/physical-expr/src/equivalence/properties.rs index a5d54ee56cff..24f1f92e7ad4 100644 --- a/datafusion/physical-expr/src/equivalence/properties.rs +++ b/datafusion/physical-expr/src/equivalence/properties.rs @@ -1524,6 +1524,102 @@ impl Hash for ExprWrapper { } } +/// Take sort orderings for unioned sides of equal length, and return the unioned ordering. +/// +/// Example: +/// child1 = order by a0, b, c +/// child2 = order by a, b, c +/// => union's joint order is a0, a, b, c. +fn calculate_joint_ordering( + lhs: &EquivalenceProperties, + rhs: &EquivalenceProperties, +) -> LexOrdering { + let mut union_ordering = vec![]; + for ordering in lhs + .normalized_oeq_class() + .orderings + .iter() + .chain(rhs.normalized_oeq_class().orderings.iter()) + { + if union_ordering.is_empty() { + union_ordering = ordering.clone(); + continue; + } + + if !union_ordering.len().eq(&ordering.len()) { + break; + } + + let mut unioned = union_ordering.into_iter().peekable(); + let mut curr = ordering.iter().peekable(); + let mut new_union = vec![]; + loop { + match (curr.next(), unioned.next()) { + (None, None) => break, + (None, Some(u)) => { + new_union.push(u.clone()); + continue; + } + (Some(c), None) => { + new_union.push(c.clone()); + continue; + } + (Some(c), Some(u)) => { + if c.eq(&u) { + new_union.push(c.clone()); + continue; + } else if c.expr.eq(&u.expr) { + // options are different => negates each other + continue; + } else { + new_union.push(u.clone()); + new_union.push(c.clone()); + continue; + } + } + } + } + union_ordering = new_union; + } + collapse_lex_ordering(union_ordering) +} + +/// Take sort orderings for unioned sides return the shorten, novel sort order. +/// +/// Example: +/// child1 = order by a, b +/// child2 = order by a1, b1, c1 +/// => union's prefixed order is a, b. +fn calculate_prefix_ordering( + lhs: &EquivalenceProperties, + rhs: &EquivalenceProperties, +) -> Vec { + // Calculate valid orderings for the union by searching for prefixes + // in both sides. + let mut orderings = vec![]; + for mut ordering in lhs.normalized_oeq_class().orderings { + // Progressively shorten the ordering to search for a satisfied prefix: + while !rhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + for mut ordering in rhs.normalized_oeq_class().orderings { + // Progressively shorten the ordering to search for a satisfied prefix: + while !lhs.ordering_satisfy(&ordering) { + ordering.pop(); + } + // There is a non-trivial satisfied prefix, add it as a valid ordering: + if !ordering.is_empty() { + orderings.push(ordering); + } + } + orderings +} + /// Calculates the union (in the sense of `UnionExec`) `EquivalenceProperties` /// of `lhs` and `rhs` according to the schema of `lhs`. fn calculate_union_binary( @@ -1553,32 +1649,15 @@ fn calculate_union_binary( }) .collect(); - // Next, calculate valid orderings for the union by searching for prefixes - // in both sides. - let mut orderings = vec![]; - for mut ordering in lhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !rhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } - for mut ordering in rhs.normalized_oeq_class().orderings { - // Progressively shorten the ordering to search for a satisfied prefix: - while !lhs.ordering_satisfy(&ordering) { - ordering.pop(); - } - // There is a non-trivial satisfied prefix, add it as a valid ordering: - if !ordering.is_empty() { - orderings.push(ordering); - } - } + // Create a unioned ordering. + let mut orderings = calculate_prefix_ordering(&lhs, &rhs); + let union_ordering = calculate_joint_ordering(&lhs, &rhs); + orderings.push(union_ordering); + let mut eq_properties = EquivalenceProperties::new(lhs.schema); eq_properties.constants = constants; eq_properties.add_new_orderings(orderings); + Ok(eq_properties) } @@ -2645,8 +2724,8 @@ mod tests { Arc::clone(&schema3), ), ], - // Expected - vec![vec!["a", "b"]], + // Expected: union sort orders + vec![vec!["a", "b", "c"]], ), // --------- TEST CASE 2 ---------- ( @@ -2720,8 +2799,8 @@ mod tests { Arc::clone(&schema3), ), ], - // Expected - vec![], + // Expected: union sort orders + vec![vec!["a", "b", "c"]], ), // --------- TEST CASE 5 ---------- ( diff --git a/datafusion/sqllogictest/test_files/order.slt b/datafusion/sqllogictest/test_files/order.slt index 7bb872e5a48f..671c18674acc 100644 --- a/datafusion/sqllogictest/test_files/order.slt +++ b/datafusion/sqllogictest/test_files/order.slt @@ -1196,3 +1196,47 @@ physical_plan 02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false] 04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true + + +# Test: inputs into union with different orderings +query TT +explain select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1 +union all +select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2 +order by d, c, a, a0, b +limit 2; +---- +logical_plan +01)Projection: t1.b, t1.c, t1.a, t1.a0 +02)--Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2 +03)----Union +04)------SubqueryAlias: t1 +05)--------Projection: ordered_table.b, ordered_table.c, ordered_table.a, Int32(NULL) AS a0, ordered_table.d +06)----------TableScan: ordered_table projection=[a, b, c, d] +07)------SubqueryAlias: t2 +08)--------Projection: ordered_table.b, ordered_table.c, Int32(NULL) AS a, ordered_table.a0, ordered_table.d +09)----------TableScan: ordered_table projection=[a0, b, c, d] +physical_plan +01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0] +02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2 +03)----UnionExec +04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d] +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true +07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false] +08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d] +09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true + +# Test: run the query from above +query IIII +select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1 +union all +select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2 +order by d, c, a, a0, b +limit 2; +---- +0 0 0 NULL +0 0 NULL 1 + +statement ok +drop table ordered_table;