Skip to content

Commit

Permalink
fix: skip EliminateCrossJoin rule if inner join with filter is found (#…
Browse files Browse the repository at this point in the history
…7529)

* Skip EliminateCrossJoin rule if inner join with filter is found - check recursively

* Add test eliminate_cross_not_possible_nested_inner_join_with_filter
  • Loading branch information
epsio-banay committed Sep 13, 2023
1 parent 8e2a0e6 commit a758270
Showing 1 changed file with 59 additions and 22 deletions.
81 changes: 59 additions & 22 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,30 +60,26 @@ impl OptimizerRule for EliminateCrossJoin {

let mut possible_join_keys: Vec<(Expr, Expr)> = vec![];
let mut all_inputs: Vec<LogicalPlan> = vec![];
match &input {
let did_flat_successfully = match &input {
LogicalPlan::Join(join) if (join.join_type == JoinType::Inner) => {
// The filter of inner join will lost, skip this rule.
// issue: https://github.com/apache/arrow-datafusion/issues/4844
if join.filter.is_some() {
return Ok(None);
}

flatten_join_inputs(
try_flatten_join_inputs(
&input,
&mut possible_join_keys,
&mut all_inputs,
)?;
}
LogicalPlan::CrossJoin(_) => {
flatten_join_inputs(
&input,
&mut possible_join_keys,
&mut all_inputs,
)?;
)?
}
LogicalPlan::CrossJoin(_) => try_flatten_join_inputs(
&input,
&mut possible_join_keys,
&mut all_inputs,
)?,
_ => {
return utils::optimize_children(self, plan, config);
}
};

if !did_flat_successfully {
return Ok(None);
}

let predicate = &filter.predicate;
Expand Down Expand Up @@ -137,13 +133,20 @@ impl OptimizerRule for EliminateCrossJoin {
}
}

fn flatten_join_inputs(
/// Recursively accumulate possible_join_keys and inputs from inner joins (including cross joins).
/// Returns a boolean indicating whether the flattening was successful.
fn try_flatten_join_inputs(
plan: &LogicalPlan,
possible_join_keys: &mut Vec<(Expr, Expr)>,
all_inputs: &mut Vec<LogicalPlan>,
) -> Result<()> {
) -> Result<bool> {
let children = match plan {
LogicalPlan::Join(join) => {
LogicalPlan::Join(join) if join.join_type == JoinType::Inner => {
if join.filter.is_some() {
// The filter of inner join will lost, skip this rule.
// issue: https://github.com/apache/arrow-datafusion/issues/4844
return Ok(false);
}
possible_join_keys.extend(join.on.clone());
let left = &*(join.left);
let right = &*(join.right);
Expand All @@ -163,18 +166,22 @@ fn flatten_join_inputs(
match *child {
LogicalPlan::Join(left_join) => {
if left_join.join_type == JoinType::Inner {
flatten_join_inputs(child, possible_join_keys, all_inputs)?;
if !try_flatten_join_inputs(child, possible_join_keys, all_inputs)? {
return Ok(false);
}
} else {
all_inputs.push((*child).clone());
}
}
LogicalPlan::CrossJoin(_) => {
flatten_join_inputs(child, possible_join_keys, all_inputs)?;
if !try_flatten_join_inputs(child, possible_join_keys, all_inputs)? {
return Ok(false);
}
}
_ => all_inputs.push((*child).clone()),
}
}
Ok(())
Ok(true)
}

fn find_inner_join(
Expand Down Expand Up @@ -363,6 +370,12 @@ mod tests {
assert_eq!(plan.schema(), optimized_plan.schema())
}

fn assert_optimization_rule_fails(plan: &LogicalPlan) {
let rule = EliminateCrossJoin::new();
let optimized_plan = rule.try_optimize(plan, &OptimizerContext::new()).unwrap();
assert!(optimized_plan.is_none());
}

#[test]
fn eliminate_cross_with_simple_and() -> Result<()> {
let t1 = test_table_scan_with_name("t1")?;
Expand Down Expand Up @@ -531,6 +544,30 @@ mod tests {
Ok(())
}

#[test]
/// See https://github.com/apache/arrow-datafusion/issues/7530
fn eliminate_cross_not_possible_nested_inner_join_with_filter() -> Result<()> {
let t1 = test_table_scan_with_name("t1")?;
let t2 = test_table_scan_with_name("t2")?;
let t3 = test_table_scan_with_name("t3")?;

// could not eliminate to inner join with filter
let plan = LogicalPlanBuilder::from(t1)
.join(
t3,
JoinType::Inner,
(vec!["t1.a"], vec!["t3.a"]),
Some(col("t1.a").gt(lit(20u32))),
)?
.join(t2, JoinType::Inner, (vec!["t1.a"], vec!["t2.a"]), None)?
.filter(col("t1.a").gt(lit(15u32)))?
.build()?;

assert_optimization_rule_fails(&plan);

Ok(())
}

#[test]
/// ```txt
/// filter: a.id = b.id and a.id = c.id
Expand Down

0 comments on commit a758270

Please sign in to comment.