Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SanityCheckPlan should compare UnionExec inputs to requirements for output (parent). #12414

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 34 additions & 1 deletion datafusion/physical-expr/src/equivalence/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use std::hash::{Hash, Hasher};
use std::ops::RangeFull;
use std::sync::Arc;

use super::{add_offset_to_expr, collapse_lex_req, ProjectionMapping};
Expand All @@ -27,6 +29,7 @@ use crate::{

use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::JoinType;
use indexmap::IndexSet;

#[derive(Debug, Clone)]
/// A structure representing a expression known to be constant in a physical execution plan.
Expand All @@ -41,7 +44,8 @@ use datafusion_common::JoinType;
///
/// - `across_partitions`: A boolean flag indicating whether the constant expression is
/// valid across partitions. If set to `true`, the constant expression has same value for all partitions.
/// If set to `false`, the constant expression may have different values for different partitions.
/// If set to `false`, the constant expression may have different constant values for different partitions
/// or only be constant within one of the partitions.
///
/// # Example
///
Expand Down Expand Up @@ -123,6 +127,35 @@ pub fn const_exprs_contains(
.any(|const_expr| const_expr.expr.eq(expr))
}

impl Eq for ConstExpr {}

impl PartialEq for ConstExpr {
fn eq(&self, other: &Self) -> bool {
self.expr.eq(other.expr())
}
}

impl Hash for ConstExpr {
fn hash<H: Hasher>(&self, state: &mut H) {
self.expr.hash(state);
}
}

/// Concats two slices of `const_exprs, removing duplicates and
/// maintaining the order.
///
/// Equality based upon the expression. `across_partitions` will
/// always be false as we do not validate the same constant value
/// on both sides.
pub fn concat_const_exprs(lhs: &[ConstExpr], rhs: &[ConstExpr]) -> Vec<ConstExpr> {
IndexSet::<&ConstExpr>::from_iter(lhs.iter().chain(rhs.iter()))
.drain(RangeFull)
.map(|constant_expr| {
ConstExpr::new(Arc::clone(&constant_expr.expr)).with_across_partitions(false)
})
.collect()
}

/// An `EquivalenceClass` is a set of [`Arc<dyn PhysicalExpr>`]s that are known
/// to have the same value for all tuples in a relation. These are generated by
/// equality predicates (e.g. `a = b`), typically equi-join conditions and
Expand Down
16 changes: 3 additions & 13 deletions datafusion/physical-expr/src/equivalence/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::hash::{Hash, Hasher};
use std::sync::Arc;

use super::class::concat_const_exprs;
use super::ordering::collapse_lex_ordering;
use crate::equivalence::class::const_exprs_contains;
use crate::equivalence::{
Expand Down Expand Up @@ -1539,19 +1540,8 @@ fn calculate_union_binary(
}

// First, calculate valid constants for the union. A quantity is constant
// after the union if it is constant in both sides.
let constants = lhs
.constants()
.iter()
.filter(|const_expr| const_exprs_contains(rhs.constants(), const_expr.expr()))
.map(|const_expr| {
// TODO: When both sides' constants are valid across partitions,
// the union's constant should also be valid if values are
// the same. However, we do not have the capability to
// check this yet.
ConstExpr::new(Arc::clone(const_expr.expr())).with_across_partitions(false)
})
.collect();
// after the union if it is constant on one of the sides.
let constants = concat_const_exprs(lhs.constants(), rhs.constants());

// Next, calculate valid orderings for the union by searching for prefixes
// in both sides.
Expand Down
44 changes: 44 additions & 0 deletions datafusion/sqllogictest/test_files/order.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1196,3 +1196,47 @@ physical_plan
02)--RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
03)----SortExec: TopK(fetch=1), expr=[a@0 ASC NULLS LAST], preserve_partitioning=[false]
04)------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b], has_header=true


# Test: inputs into union with different orderings
query TT
explain select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1
union all
select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2
order by d, c, a, a0, b
limit 2;
----
logical_plan
01)Projection: t1.b, t1.c, t1.a, t1.a0
02)--Sort: t1.d ASC NULLS LAST, t1.c ASC NULLS LAST, t1.a ASC NULLS LAST, t1.a0 ASC NULLS LAST, t1.b ASC NULLS LAST, fetch=2
03)----Union
04)------SubqueryAlias: t1
05)--------Projection: ordered_table.b, ordered_table.c, ordered_table.a, Int32(NULL) AS a0, ordered_table.d
06)----------TableScan: ordered_table projection=[a, b, c, d]
07)------SubqueryAlias: t2
08)--------Projection: ordered_table.b, ordered_table.c, Int32(NULL) AS a, ordered_table.a0, ordered_table.d
09)----------TableScan: ordered_table projection=[a0, b, c, d]
physical_plan
01)ProjectionExec: expr=[b@0 as b, c@1 as c, a@2 as a, a0@3 as a0]
02)--SortPreservingMergeExec: [d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,a@2 ASC NULLS LAST,a0@3 ASC NULLS LAST,b@0 ASC NULLS LAST], fetch=2
03)----UnionExec
04)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
05)--------ProjectionExec: expr=[b@1 as b, c@2 as c, a@0 as a, NULL as a0, d@3 as d]
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true
07)------SortExec: TopK(fetch=2), expr=[d@4 ASC NULLS LAST,c@1 ASC NULLS LAST,b@0 ASC NULLS LAST], preserve_partitioning=[false]
08)--------ProjectionExec: expr=[b@1 as b, c@2 as c, NULL as a, a0@0 as a0, d@3 as d]
09)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, b, c, d], output_ordering=[c@2 ASC NULLS LAST], has_header=true

# Test: run the query from above
query IIII
select * from (select b, c, a, NULL::int as a0 from ordered_table order by a, c) t1
union all
select * from (select b, c, NULL::int as a, a0 from ordered_table order by a0, c) t2
order by d, c, a, a0, b
limit 2;
----
0 0 0 NULL
0 0 NULL 1

statement ok
drop table ordered_table;
Loading