Skip to content

Commit 2edfff4

Browse files
committed
Optimized propagation of empty relations #10290
1 parent d3237b2 commit 2edfff4

File tree

2 files changed

+113
-55
lines changed

2 files changed

+113
-55
lines changed

datafusion/optimizer/src/propagate_empty_relation.rs

+86-55
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616
// under the License.
1717

1818
//! [`PropagateEmptyRelation`] eliminates nodes fed by `EmptyRelation`
19-
use datafusion_common::{plan_err, Result};
20-
use datafusion_expr::logical_plan::LogicalPlan;
21-
use datafusion_expr::{EmptyRelation, JoinType, Projection, Union};
19+
2220
use std::sync::Arc;
2321

22+
use datafusion_common::tree_node::Transformed;
23+
use datafusion_common::JoinType::Inner;
24+
use datafusion_common::{internal_err, plan_err, Result};
25+
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
26+
use datafusion_expr::logical_plan::LogicalPlan;
27+
use datafusion_expr::{EmptyRelation, Projection, Union};
28+
2429
use crate::optimizer::ApplyOrder;
2530
use crate::{OptimizerConfig, OptimizerRule};
2631

@@ -38,32 +43,58 @@ impl PropagateEmptyRelation {
3843
impl OptimizerRule for PropagateEmptyRelation {
3944
fn try_optimize(
4045
&self,
41-
plan: &LogicalPlan,
46+
_plan: &LogicalPlan,
4247
_config: &dyn OptimizerConfig,
4348
) -> Result<Option<LogicalPlan>> {
49+
internal_err!("Should have called PropagateEmptyRelation::rewrite")
50+
}
51+
52+
fn name(&self) -> &str {
53+
"propagate_empty_relation"
54+
}
55+
56+
fn apply_order(&self) -> Option<ApplyOrder> {
57+
Some(ApplyOrder::BottomUp)
58+
}
59+
60+
fn supports_rewrite(&self) -> bool {
61+
true
62+
}
63+
64+
fn rewrite(
65+
&self,
66+
plan: LogicalPlan,
67+
_config: &dyn OptimizerConfig,
68+
) -> Result<Transformed<LogicalPlan>> {
4469
match plan {
45-
LogicalPlan::EmptyRelation(_) => {}
70+
LogicalPlan::EmptyRelation(_) => Ok(Transformed::no(plan)),
4671
LogicalPlan::Projection(_)
4772
| LogicalPlan::Filter(_)
4873
| LogicalPlan::Window(_)
4974
| LogicalPlan::Sort(_)
5075
| LogicalPlan::SubqueryAlias(_)
5176
| LogicalPlan::Repartition(_)
5277
| LogicalPlan::Limit(_) => {
53-
if let Some(empty) = empty_child(plan)? {
54-
return Ok(Some(empty));
78+
let empty = empty_child(&plan)?;
79+
if let Some(empty_plan) = empty {
80+
return Ok(Transformed::yes(empty_plan));
5581
}
82+
Ok(Transformed::no(plan))
5683
}
57-
LogicalPlan::CrossJoin(_) => {
58-
let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?;
84+
LogicalPlan::CrossJoin(ref join) => {
85+
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
5986
if left_empty || right_empty {
60-
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
61-
produce_one_row: false,
62-
schema: plan.schema().clone(),
63-
})));
87+
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
88+
EmptyRelation {
89+
produce_one_row: false,
90+
schema: plan.schema().clone(),
91+
},
92+
)));
6493
}
94+
Ok(Transformed::no(LogicalPlan::CrossJoin(join.clone())))
6595
}
66-
LogicalPlan::Join(join) => {
96+
97+
LogicalPlan::Join(ref join) if join.join_type == Inner => {
6798
// TODO: For Join, more join type need to be careful:
6899
// For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty.
69100
// For LeftSemi Join, if the right side is empty, the Join result is empty.
@@ -76,17 +107,26 @@ impl OptimizerRule for PropagateEmptyRelation {
76107
// columns + right side columns replaced with null values.
77108
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
78109
// columns + left side columns replaced with null values.
79-
if join.join_type == JoinType::Inner {
80-
let (left_empty, right_empty) = binary_plan_children_is_empty(plan)?;
81-
if left_empty || right_empty {
82-
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
110+
let (left_empty, right_empty) = binary_plan_children_is_empty(&plan)?;
111+
if left_empty || right_empty {
112+
return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
113+
EmptyRelation {
83114
produce_one_row: false,
84-
schema: plan.schema().clone(),
85-
})));
115+
schema: join.schema.clone(),
116+
},
117+
)));
118+
}
119+
Ok(Transformed::no(LogicalPlan::Join(join.clone())))
120+
}
121+
LogicalPlan::Aggregate(ref agg) => {
122+
if !agg.group_expr.is_empty() {
123+
if let Some(empty_plan) = empty_child(&plan)? {
124+
return Ok(Transformed::yes(empty_plan));
86125
}
87126
}
127+
Ok(Transformed::no(LogicalPlan::Aggregate(agg.clone())))
88128
}
89-
LogicalPlan::Union(union) => {
129+
LogicalPlan::Union(ref union) => {
90130
let new_inputs = union
91131
.inputs
92132
.iter()
@@ -98,49 +138,36 @@ impl OptimizerRule for PropagateEmptyRelation {
98138
.collect::<Vec<_>>();
99139

100140
if new_inputs.len() == union.inputs.len() {
101-
return Ok(None);
141+
Ok(Transformed::no(plan))
102142
} else if new_inputs.is_empty() {
103-
return Ok(Some(LogicalPlan::EmptyRelation(EmptyRelation {
104-
produce_one_row: false,
105-
schema: plan.schema().clone(),
106-
})));
143+
Ok(Transformed::yes(LogicalPlan::EmptyRelation(
144+
EmptyRelation {
145+
produce_one_row: false,
146+
schema: plan.schema().clone(),
147+
},
148+
)))
107149
} else if new_inputs.len() == 1 {
108-
let child = (*new_inputs[0]).clone();
150+
let child = unwrap_arc(new_inputs[0].clone());
109151
if child.schema().eq(plan.schema()) {
110-
return Ok(Some(child));
152+
Ok(Transformed::yes(child))
111153
} else {
112-
return Ok(Some(LogicalPlan::Projection(
154+
Ok(Transformed::yes(LogicalPlan::Projection(
113155
Projection::new_from_schema(
114156
Arc::new(child),
115157
plan.schema().clone(),
116158
),
117-
)));
159+
)))
118160
}
119161
} else {
120-
return Ok(Some(LogicalPlan::Union(Union {
162+
Ok(Transformed::yes(LogicalPlan::Union(Union {
121163
inputs: new_inputs,
122164
schema: union.schema.clone(),
123-
})));
124-
}
125-
}
126-
LogicalPlan::Aggregate(agg) => {
127-
if !agg.group_expr.is_empty() {
128-
if let Some(empty) = empty_child(plan)? {
129-
return Ok(Some(empty));
130-
}
165+
})))
131166
}
132167
}
133-
_ => {}
134-
}
135-
Ok(None)
136-
}
137168

138-
fn name(&self) -> &str {
139-
"propagate_empty_relation"
140-
}
141-
142-
fn apply_order(&self) -> Option<ApplyOrder> {
143-
Some(ApplyOrder::BottomUp)
169+
_ => Ok(Transformed::no(plan)),
170+
}
144171
}
145172
}
146173

@@ -182,18 +209,22 @@ fn empty_child(plan: &LogicalPlan) -> Result<Option<LogicalPlan>> {
182209

183210
#[cfg(test)]
184211
mod tests {
212+
use std::sync::Arc;
213+
214+
use arrow::datatypes::{DataType, Field, Schema};
215+
216+
use datafusion_common::{Column, DFSchema, JoinType, ScalarValue};
217+
use datafusion_expr::logical_plan::table_scan;
218+
use datafusion_expr::{
219+
binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, Operator,
220+
};
221+
185222
use crate::eliminate_filter::EliminateFilter;
186223
use crate::eliminate_nested_union::EliminateNestedUnion;
187224
use crate::test::{
188225
assert_optimized_plan_eq, assert_optimized_plan_eq_with_rules, test_table_scan,
189226
test_table_scan_fields, test_table_scan_with_name,
190227
};
191-
use arrow::datatypes::{DataType, Field, Schema};
192-
use datafusion_common::{Column, DFSchema, ScalarValue};
193-
use datafusion_expr::logical_plan::table_scan;
194-
use datafusion_expr::{
195-
binary_expr, col, lit, logical_plan::builder::LogicalPlanBuilder, Expr, Operator,
196-
};
197228

198229
use super::*;
199230

datafusion/optimizer/tests/optimizer_integration.rs

+27
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::collections::HashMap;
2020
use std::sync::Arc;
2121

2222
use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
23+
2324
use datafusion_common::config::ConfigOptions;
2425
use datafusion_common::{plan_err, Result};
2526
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
@@ -290,6 +291,32 @@ fn eliminate_nested_filters() {
290291
assert_eq!(expected, format!("{plan:?}"));
291292
}
292293

294+
#[test]
295+
fn test_propagate_empty_relation_inner_join_and_unions() {
296+
let sql = "\
297+
SELECT A.col_int32 FROM test AS A \
298+
INNER JOIN ( \
299+
SELECT col_int32 FROM test WHERE 1 = 0 \
300+
) AS B ON A.col_int32 = B.col_int32 \
301+
UNION ALL \
302+
SELECT test.col_int32 FROM test WHERE 1 = 1 \
303+
UNION ALL \
304+
SELECT test.col_int32 FROM test WHERE 0 = 0 \
305+
UNION ALL \
306+
SELECT test.col_int32 FROM test WHERE test.col_int32 < 0 \
307+
UNION ALL \
308+
SELECT test.col_int32 FROM test WHERE 1 = 0";
309+
310+
let plan = test_sql(sql).unwrap();
311+
let expected = "\
312+
Union\
313+
\n TableScan: test projection=[col_int32]\
314+
\n TableScan: test projection=[col_int32]\
315+
\n Filter: test.col_int32 < Int32(0)\
316+
\n TableScan: test projection=[col_int32]";
317+
assert_eq!(expected, format!("{plan:?}"));
318+
}
319+
293320
fn test_sql(sql: &str) -> Result<LogicalPlan> {
294321
// parse the SQL
295322
let dialect = GenericDialect {}; // or AnsiDialect, or your own dialect ...

0 commit comments

Comments
 (0)