Skip to content

Commit

Permalink
fix: for the UnionExec, the sanity check should enforce restrictions …
Browse files Browse the repository at this point in the history
…based upon the Union's parent vs the Union's input
  • Loading branch information
wiedld committed Sep 10, 2024
1 parent c2e652e commit f820a8e
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions datafusion/core/src/physical_optimizer/sanity_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported};
use datafusion_physical_plan::joins::SymmetricHashJoinExec;
use datafusion_physical_plan::union::UnionExec;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};

use datafusion_physical_optimizer::PhysicalOptimizerRule;
Expand Down Expand Up @@ -121,37 +122,50 @@ pub fn check_plan_sanity(
check_finiteness_requirements(plan.clone(), optimizer_options)?;

for ((idx, child), sort_req, dist_req) in izip!(
plan.children().iter().enumerate(),
plan.children().into_iter().enumerate(),
plan.required_input_ordering().iter(),
plan.required_input_distribution().iter()
) {
let child_eq_props = child.equivalence_properties();
if let Some(sort_req) = sort_req {
if !child_eq_props.ordering_satisfy_requirement(sort_req) {
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy order requirements: {:?}. Child-{} order: {:?}",
plan_str,
sort_req,
idx,
child_eq_props.oeq_class
);
// The `EquivalenceProperties::ordering_satisfy_requirement` compares the oeq_class
// orderings, minus their constants, to the requirement.
//
// For the UnionExec, it has the oeq_class orderings from it's children but does not
// have the same constants. As such, the sort requirements cannot be fulfilled
// without examination of the union's children with both the orderings & constants.
let children = match child.as_any().downcast_ref::<UnionExec>() {
Some(union) => union.children(),
_ => vec![child],
};

for child in children {
let child_eq_props = child.equivalence_properties();
if !child_eq_props.ordering_satisfy_requirement(sort_req) {
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy order requirements: {:?}. Child-{} order: {:?}",
plan_str,
sort_req,
idx,
child_eq_props.oeq_class
);
}

if !child
.output_partitioning()
.satisfy(dist_req, child_eq_props)
{
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy distribution requirements: {:?}. Child-{} output partitioning: {:?}",
plan_str,
dist_req,
idx,
child.output_partitioning()
);
}
}
}

if !child
.output_partitioning()
.satisfy(dist_req, child_eq_props)
{
let plan_str = get_plan_string(&plan);
return plan_err!(
"Plan: {:?} does not satisfy distribution requirements: {:?}. Child-{} output partitioning: {:?}",
plan_str,
dist_req,
idx,
child.output_partitioning()
);
}
}

Ok(Transformed::no(plan))
Expand Down

0 comments on commit f820a8e

Please sign in to comment.