Skip to content

Commit 18042fd

Browse files
authored
feat: propagate EmptyRelation for more join types (#10963)
* feat: propagate empty for more join types * feat: update subquery de-correlation test * tests: simplify tests * refactor: better name * style: clippy * refactor: update tests * refactor: rename * refactor: fix spellings * add slt tests
1 parent 1155b0b commit 18042fd

File tree

6 files changed

+290
-38
lines changed

6 files changed

+290
-38
lines changed

datafusion/core/tests/parquet/arrow_statistics.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! This file contains an end to end test of extracting statitics from parquet files.
18+
//! This file contains an end to end test of extracting statistics from parquet files.
1919
//! It writes data into a parquet file, reads statistics and verifies they are correct
2020
2121
use std::default::Default;
@@ -716,8 +716,8 @@ async fn test_timestamp() {
716716
// "seconds_timezoned" --> TimestampSecondArray
717717
// "names" --> StringArray
718718
//
719-
// The file is created by 4 record batches, each has 5 rowws.
720-
// Since the row group isze is set to 5, those 4 batches will go into 4 row groups
719+
// The file is created by 4 record batches, each has 5 rows.
720+
// Since the row group size is set to 5, those 4 batches will go into 4 row groups
721721
// This creates a parquet files of 4 columns named "nanos", "nanos_timezoned", "micros", "micros_timezoned", "millis", "millis_timezoned", "seconds", "seconds_timezoned"
722722
let reader = TestReader {
723723
scenario: Scenario::Timestamps,
@@ -2039,7 +2039,7 @@ async fn test_missing_statistics() {
20392039
expected_min: Arc::new(Int64Array::from(vec![None])),
20402040
expected_max: Arc::new(Int64Array::from(vec![None])),
20412041
expected_null_counts: UInt64Array::from(vec![None]),
2042-
expected_row_counts: Some(UInt64Array::from(vec![3])), // stil has row count statistics
2042+
expected_row_counts: Some(UInt64Array::from(vec![3])), // still has row count statistics
20432043
column_name: "i64",
20442044
check: Check::RowGroup,
20452045
}

datafusion/optimizer/src/eliminate_one_union.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,11 @@ mod tests {
8888
}
8989

9090
fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> {
91-
assert_optimized_plan_eq_with_rules(
91+
assert_optimized_plan_with_rules(
9292
vec![Arc::new(EliminateOneUnion::new())],
9393
plan,
9494
expected,
95+
true,
9596
)
9697
}
9798

datafusion/optimizer/src/propagate_empty_relation.rs

Lines changed: 154 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
use std::sync::Arc;
2121

2222
use datafusion_common::tree_node::Transformed;
23-
use datafusion_common::JoinType::Inner;
23+
use datafusion_common::JoinType;
2424
use datafusion_common::{internal_err, plan_err, Result};
2525
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
2626
use datafusion_expr::logical_plan::LogicalPlan;
@@ -94,29 +94,62 @@ impl OptimizerRule for PropagateEmptyRelation {
9494
Ok(Transformed::no(LogicalPlan::CrossJoin(join.clone())))
9595
}
9696

97-
LogicalPlan::Join(ref join) if join.join_type == Inner => {
97+
LogicalPlan::Join(ref join) => {
9898
// TODO: For Join, more join type need to be careful:
99-
// For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty.
100-
// For LeftSemi Join, if the right side is empty, the Join result is empty.
10199
// For LeftAnti Join, if the right side is empty, the Join result is left side(should exclude null ??).
102-
// For RightOuter/RightSemi/RightAnti Join, only the right side is empty, the Join result is empty.
103-
// For RightSemi Join, if the left side is empty, the Join result is empty.
104100
// For RightAnti Join, if the left side is empty, the Join result is right side(should exclude null ??).
105101
// For Full Join, only both sides are empty, the Join result is empty.
106102
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
107103
// columns + right side columns replaced with null values.
108104
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
109105
// columns + left side columns replaced with null values.
110106
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
111-
if left_empty || right_empty {
112-
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
113-
EmptyRelation {
107+
108+
match join.join_type {
109+
JoinType::Inner if left_empty || right_empty => Ok(Transformed::yes(
110+
LogicalPlan::EmptyRelation(EmptyRelation {
114111
produce_one_row: false,
115112
schema: join.schema.clone(),
116-
},
117-
)));
113+
}),
114+
)),
115+
JoinType::Left if left_empty => Ok(Transformed::yes(
116+
LogicalPlan::EmptyRelation(EmptyRelation {
117+
produce_one_row: false,
118+
schema: join.schema.clone(),
119+
}),
120+
)),
121+
JoinType::Right if right_empty => Ok(Transformed::yes(
122+
LogicalPlan::EmptyRelation(EmptyRelation {
123+
produce_one_row: false,
124+
schema: join.schema.clone(),
125+
}),
126+
)),
127+
JoinType::LeftSemi if left_empty || right_empty => Ok(
128+
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
129+
produce_one_row: false,
130+
schema: join.schema.clone(),
131+
})),
132+
),
133+
JoinType::RightSemi if left_empty || right_empty => Ok(
134+
Transformed::yes(LogicalPlan::EmptyRelation(EmptyRelation {
135+
produce_one_row: false,
136+
schema: join.schema.clone(),
137+
})),
138+
),
139+
JoinType::LeftAnti if left_empty => Ok(Transformed::yes(
140+
LogicalPlan::EmptyRelation(EmptyRelation {
141+
produce_one_row: false,
142+
schema: join.schema.clone(),
143+
}),
144+
)),
145+
JoinType::RightAnti if right_empty => Ok(Transformed::yes(
146+
LogicalPlan::EmptyRelation(EmptyRelation {
147+
produce_one_row: false,
148+
schema: join.schema.clone(),
149+
}),
150+
)),
151+
_ => Ok(Transformed::no(LogicalPlan::Join(join.clone()))),
118152
}
119-
Ok(Transformed::no(LogicalPlan::Join(join.clone())))
120153
}
121154
LogicalPlan::Aggregate(ref agg) => {
122155
if !agg.group_expr.is_empty() {
@@ -222,7 +255,7 @@ mod tests {
222255
use crate::eliminate_filter::EliminateFilter;
223256
use crate::eliminate_nested_union::EliminateNestedUnion;
224257
use crate::test::{
225-
assert_optimized_plan_eq, assert_optimized_plan_eq_with_rules, test_table_scan,
258+
assert_optimized_plan_eq, assert_optimized_plan_with_rules, test_table_scan,
226259
test_table_scan_fields, test_table_scan_with_name,
227260
};
228261

@@ -232,18 +265,20 @@ mod tests {
232265
assert_optimized_plan_eq(Arc::new(PropagateEmptyRelation::new()), plan, expected)
233266
}
234267

235-
fn assert_together_optimized_plan_eq(
268+
fn assert_together_optimized_plan(
236269
plan: LogicalPlan,
237270
expected: &str,
271+
eq: bool,
238272
) -> Result<()> {
239-
assert_optimized_plan_eq_with_rules(
273+
assert_optimized_plan_with_rules(
240274
vec![
241275
Arc::new(EliminateFilter::new()),
242276
Arc::new(EliminateNestedUnion::new()),
243277
Arc::new(PropagateEmptyRelation::new()),
244278
],
245279
plan,
246280
expected,
281+
eq,
247282
)
248283
}
249284

@@ -279,7 +314,7 @@ mod tests {
279314
.build()?;
280315

281316
let expected = "EmptyRelation";
282-
assert_together_optimized_plan_eq(plan, expected)
317+
assert_together_optimized_plan(plan, expected, true)
283318
}
284319

285320
#[test]
@@ -292,7 +327,7 @@ mod tests {
292327
let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
293328

294329
let expected = "TableScan: test";
295-
assert_together_optimized_plan_eq(plan, expected)
330+
assert_together_optimized_plan(plan, expected, true)
296331
}
297332

298333
#[test]
@@ -317,7 +352,7 @@ mod tests {
317352
let expected = "Union\
318353
\n TableScan: test1\
319354
\n TableScan: test4";
320-
assert_together_optimized_plan_eq(plan, expected)
355+
assert_together_optimized_plan(plan, expected, true)
321356
}
322357

323358
#[test]
@@ -342,7 +377,7 @@ mod tests {
342377
.build()?;
343378

344379
let expected = "EmptyRelation";
345-
assert_together_optimized_plan_eq(plan, expected)
380+
assert_together_optimized_plan(plan, expected, true)
346381
}
347382

348383
#[test]
@@ -369,7 +404,7 @@ mod tests {
369404
let expected = "Union\
370405
\n TableScan: test2\
371406
\n TableScan: test3";
372-
assert_together_optimized_plan_eq(plan, expected)
407+
assert_together_optimized_plan(plan, expected, true)
373408
}
374409

375410
#[test]
@@ -382,7 +417,7 @@ mod tests {
382417
let plan = LogicalPlanBuilder::from(left).union(right)?.build()?;
383418

384419
let expected = "TableScan: test";
385-
assert_together_optimized_plan_eq(plan, expected)
420+
assert_together_optimized_plan(plan, expected, true)
386421
}
387422

388423
#[test]
@@ -397,7 +432,103 @@ mod tests {
397432
.build()?;
398433

399434
let expected = "EmptyRelation";
400-
assert_together_optimized_plan_eq(plan, expected)
435+
assert_together_optimized_plan(plan, expected, true)
436+
}
437+
438+
fn assert_empty_left_empty_right_lp(
439+
left_empty: bool,
440+
right_empty: bool,
441+
join_type: JoinType,
442+
eq: bool,
443+
) -> Result<()> {
444+
let left_lp = if left_empty {
445+
let left_table_scan = test_table_scan()?;
446+
447+
LogicalPlanBuilder::from(left_table_scan)
448+
.filter(Expr::Literal(ScalarValue::Boolean(Some(false))))?
449+
.build()
450+
} else {
451+
let scan = test_table_scan_with_name("left").unwrap();
452+
LogicalPlanBuilder::from(scan).build()
453+
}?;
454+
455+
let right_lp = if right_empty {
456+
let right_table_scan = test_table_scan_with_name("right")?;
457+
458+
LogicalPlanBuilder::from(right_table_scan)
459+
.filter(Expr::Literal(ScalarValue::Boolean(Some(false))))?
460+
.build()
461+
} else {
462+
let scan = test_table_scan_with_name("right").unwrap();
463+
LogicalPlanBuilder::from(scan).build()
464+
}?;
465+
466+
let plan = LogicalPlanBuilder::from(left_lp)
467+
.join_using(
468+
right_lp,
469+
join_type,
470+
vec![Column::from_name("a".to_string())],
471+
)?
472+
.build()?;
473+
474+
let expected = "EmptyRelation";
475+
assert_together_optimized_plan(plan, expected, eq)
476+
}
477+
478+
#[test]
479+
fn test_join_empty_propagation_rules() -> Result<()> {
480+
// test left join with empty left
481+
assert_empty_left_empty_right_lp(true, false, JoinType::Left, true)?;
482+
483+
// test right join with empty right
484+
assert_empty_left_empty_right_lp(false, true, JoinType::Right, true)?;
485+
486+
// test left semi join with empty left
487+
assert_empty_left_empty_right_lp(true, false, JoinType::LeftSemi, true)?;
488+
489+
// test left semi join with empty right
490+
assert_empty_left_empty_right_lp(false, true, JoinType::LeftSemi, true)?;
491+
492+
// test right semi join with empty left
493+
assert_empty_left_empty_right_lp(true, false, JoinType::RightSemi, true)?;
494+
495+
// test right semi join with empty right
496+
assert_empty_left_empty_right_lp(false, true, JoinType::RightSemi, true)?;
497+
498+
// test left anti join empty left
499+
assert_empty_left_empty_right_lp(true, false, JoinType::LeftAnti, true)?;
500+
501+
// test right anti join empty right
502+
assert_empty_left_empty_right_lp(false, true, JoinType::RightAnti, true)
503+
}
504+
505+
#[test]
506+
fn test_join_empty_propagation_rules_noop() -> Result<()> {
507+
// these cases should not result in an empty relation
508+
509+
// test left join with empty right
510+
assert_empty_left_empty_right_lp(false, true, JoinType::Left, false)?;
511+
512+
// test right join with empty left
513+
assert_empty_left_empty_right_lp(true, false, JoinType::Right, false)?;
514+
515+
// test left semi with non-empty left and right
516+
assert_empty_left_empty_right_lp(false, false, JoinType::LeftSemi, false)?;
517+
518+
// test right semi with non-empty left and right
519+
assert_empty_left_empty_right_lp(false, false, JoinType::RightSemi, false)?;
520+
521+
// test left anti join with non-empty left and right
522+
assert_empty_left_empty_right_lp(false, false, JoinType::LeftAnti, false)?;
523+
524+
// test left anti with non-empty left and empty right
525+
assert_empty_left_empty_right_lp(false, true, JoinType::LeftAnti, false)?;
526+
527+
// test right anti join with non-empty left and right
528+
assert_empty_left_empty_right_lp(false, false, JoinType::RightAnti, false)?;
529+
530+
// test right anti with empty left and non-empty right
531+
assert_empty_left_empty_right_lp(true, false, JoinType::RightAnti, false)
401532
}
402533

403534
#[test]
@@ -430,6 +561,6 @@ mod tests {
430561
let expected = "Projection: a, b, c\
431562
\n TableScan: test";
432563

433-
assert_together_optimized_plan_eq(plan, expected)
564+
assert_together_optimized_plan(plan, expected, true)
434565
}
435566
}

datafusion/optimizer/src/test/mod.rs

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,21 @@ pub fn assert_analyzed_plan_eq(
121121

122122
Ok(())
123123
}
124+
125+
pub fn assert_analyzed_plan_ne(
126+
rule: Arc<dyn AnalyzerRule + Send + Sync>,
127+
plan: LogicalPlan,
128+
expected: &str,
129+
) -> Result<()> {
130+
let options = ConfigOptions::default();
131+
let analyzed_plan =
132+
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
133+
let formatted_plan = format!("{analyzed_plan:?}");
134+
assert_ne!(formatted_plan, expected);
135+
136+
Ok(())
137+
}
138+
124139
pub fn assert_analyzed_plan_eq_display_indent(
125140
rule: Arc<dyn AnalyzerRule + Send + Sync>,
126141
plan: LogicalPlan,
@@ -169,21 +184,33 @@ pub fn assert_optimized_plan_eq(
169184
Ok(())
170185
}
171186

172-
pub fn assert_optimized_plan_eq_with_rules(
187+
fn generate_optimized_plan_with_rules(
173188
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
174189
plan: LogicalPlan,
175-
expected: &str,
176-
) -> Result<()> {
190+
) -> LogicalPlan {
177191
fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
178192
let config = &mut OptimizerContext::new()
179193
.with_max_passes(1)
180194
.with_skip_failing_rules(false);
181195
let optimizer = Optimizer::with_rules(rules);
182-
let optimized_plan = optimizer
196+
optimizer
183197
.optimize(plan, config, observe)
184-
.expect("failed to optimize plan");
198+
.expect("failed to optimize plan")
199+
}
200+
201+
pub fn assert_optimized_plan_with_rules(
202+
rules: Vec<Arc<dyn OptimizerRule + Send + Sync>>,
203+
plan: LogicalPlan,
204+
expected: &str,
205+
eq: bool,
206+
) -> Result<()> {
207+
let optimized_plan = generate_optimized_plan_with_rules(rules, plan);
185208
let formatted_plan = format!("{optimized_plan:?}");
186-
assert_eq!(formatted_plan, expected);
209+
if eq {
210+
assert_eq!(formatted_plan, expected);
211+
} else {
212+
assert_ne!(formatted_plan, expected);
213+
}
187214
Ok(())
188215
}
189216

0 commit comments

Comments
 (0)