Skip to content

Commit 6dffc53

Browse files
authored
Deprecate OptimizerRule::try_optimize (#11022)
* Deprecate OptimizerRule::try_optimize * optimize_children * Apply review suggestions * Fix clippy lint
1 parent 4a0c7f3 commit 6dffc53

File tree

5 files changed

+89
-42
lines changed

5 files changed

+89
-42
lines changed

datafusion-examples/examples/rewrite_expr.rs

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,13 @@
1818
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
1919
use datafusion_common::config::ConfigOptions;
2020
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
21-
use datafusion_common::{plan_err, Result, ScalarValue};
21+
use datafusion_common::{plan_err, DataFusionError, Result, ScalarValue};
2222
use datafusion_expr::{
2323
AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF,
2424
};
2525
use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule};
26-
use datafusion_optimizer::optimizer::Optimizer;
27-
use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule};
26+
use datafusion_optimizer::optimizer::{ApplyOrder, Optimizer};
27+
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
2828
use datafusion_sql::planner::{ContextProvider, SqlToRel};
2929
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
3030
use datafusion_sql::sqlparser::parser::Parser;
@@ -133,30 +133,34 @@ impl OptimizerRule for MyOptimizerRule {
133133

134134
fn try_optimize(
135135
&self,
136-
plan: &LogicalPlan,
137-
config: &dyn OptimizerConfig,
136+
_plan: &LogicalPlan,
137+
_config: &dyn OptimizerConfig,
138138
) -> Result<Option<LogicalPlan>> {
139-
// recurse down and optimize children first
140-
let optimized_plan = utils::optimize_children(self, plan, config)?;
141-
match optimized_plan {
142-
Some(LogicalPlan::Filter(filter)) => {
139+
unreachable!()
140+
}
141+
142+
fn apply_order(&self) -> Option<ApplyOrder> {
143+
Some(ApplyOrder::BottomUp)
144+
}
145+
146+
fn supports_rewrite(&self) -> bool {
147+
true
148+
}
149+
150+
fn rewrite(
151+
&self,
152+
plan: LogicalPlan,
153+
_config: &dyn OptimizerConfig,
154+
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
155+
match plan {
156+
LogicalPlan::Filter(filter) => {
143157
let predicate = my_rewrite(filter.predicate.clone())?;
144-
Ok(Some(LogicalPlan::Filter(Filter::try_new(
158+
Ok(Transformed::yes(LogicalPlan::Filter(Filter::try_new(
145159
predicate,
146-
filter.input,
160+
filter.input.clone(),
147161
)?)))
148162
}
149-
Some(optimized_plan) => Ok(Some(optimized_plan)),
150-
None => match plan {
151-
LogicalPlan::Filter(filter) => {
152-
let predicate = my_rewrite(filter.predicate.clone())?;
153-
Ok(Some(LogicalPlan::Filter(Filter::try_new(
154-
predicate,
155-
filter.input.clone(),
156-
)?)))
157-
}
158-
_ => Ok(None),
159-
},
163+
_ => Ok(Transformed::no(plan)),
160164
}
161165
}
162166
}

datafusion/core/tests/user_defined/user_defined_plan.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ use datafusion::{
8080
Expr, Extension, Limit, LogicalPlan, Sort, UserDefinedLogicalNode,
8181
UserDefinedLogicalNodeCore,
8282
},
83-
optimizer::{optimize_children, OptimizerConfig, OptimizerRule},
83+
optimizer::{OptimizerConfig, OptimizerRule},
8484
physical_expr::EquivalenceProperties,
8585
physical_plan::{
8686
DisplayAs, DisplayFormatType, Distribution, ExecutionMode, ExecutionPlan,
@@ -92,6 +92,8 @@ use datafusion::{
9292
};
9393

9494
use async_trait::async_trait;
95+
use datafusion_common::tree_node::Transformed;
96+
use datafusion_optimizer::optimizer::ApplyOrder;
9597
use futures::{Stream, StreamExt};
9698

9799
/// Execute the specified sql and return the resulting record batches
@@ -282,17 +284,37 @@ impl OptimizerRule for TopKOptimizerRule {
282284
// Example rewrite pass to insert a user defined LogicalPlanNode
283285
fn try_optimize(
284286
&self,
285-
plan: &LogicalPlan,
286-
config: &dyn OptimizerConfig,
287+
_plan: &LogicalPlan,
288+
_config: &dyn OptimizerConfig,
287289
) -> Result<Option<LogicalPlan>> {
290+
unreachable!()
291+
}
292+
293+
fn name(&self) -> &str {
294+
"topk"
295+
}
296+
297+
fn apply_order(&self) -> Option<ApplyOrder> {
298+
Some(ApplyOrder::TopDown)
299+
}
300+
301+
fn supports_rewrite(&self) -> bool {
302+
true
303+
}
304+
305+
fn rewrite(
306+
&self,
307+
plan: LogicalPlan,
308+
_config: &dyn OptimizerConfig,
309+
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
288310
// Note: this code simply looks for the pattern of a Limit followed by a
289311
// Sort and replaces it by a TopK node. It does not handle many
290312
// edge cases (e.g multiple sort columns, sort ASC / DESC), etc.
291313
if let LogicalPlan::Limit(Limit {
292314
fetch: Some(fetch),
293315
input,
294316
..
295-
}) = plan
317+
}) = &plan
296318
{
297319
if let LogicalPlan::Sort(Sort {
298320
ref expr,
@@ -302,26 +324,18 @@ impl OptimizerRule for TopKOptimizerRule {
302324
{
303325
if expr.len() == 1 {
304326
// we found a sort with a single sort expr, replace with a a TopK
305-
return Ok(Some(LogicalPlan::Extension(Extension {
327+
return Ok(Transformed::yes(LogicalPlan::Extension(Extension {
306328
node: Arc::new(TopKPlanNode {
307329
k: *fetch,
308-
input: self
309-
.try_optimize(input.as_ref(), config)?
310-
.unwrap_or_else(|| input.as_ref().clone()),
330+
input: input.as_ref().clone(),
311331
expr: expr[0].clone(),
312332
}),
313333
})));
314334
}
315335
}
316336
}
317337

318-
// If we didn't find the Limit/Sort combination, recurse as
319-
// normal and build the result.
320-
optimize_children(self, plan, config)
321-
}
322-
323-
fn name(&self) -> &str {
324-
"topk"
338+
Ok(Transformed::no(plan))
325339
}
326340
}
327341

datafusion/optimizer/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pub mod test;
6161

6262
pub use analyzer::{Analyzer, AnalyzerRule};
6363
pub use optimizer::{Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule};
64+
#[allow(deprecated)]
6465
pub use utils::optimize_children;
6566

6667
pub(crate) mod join_key_set;

datafusion/optimizer/src/optimizer.rs

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ pub trait OptimizerRule {
7777
/// Note this API will be deprecated in the future as it requires `clone`ing
7878
/// the input plan, which can be expensive. OptimizerRules should implement
7979
/// [`Self::rewrite`] instead.
80+
#[deprecated(
81+
since = "40.0.0",
82+
note = "please implement supports_rewrite and rewrite instead"
83+
)]
8084
fn try_optimize(
8185
&self,
8286
plan: &LogicalPlan,
@@ -332,6 +336,7 @@ fn optimize_plan_node(
332336
return rule.rewrite(plan, config);
333337
}
334338

339+
#[allow(deprecated)]
335340
rule.try_optimize(&plan, config).map(|maybe_plan| {
336341
match maybe_plan {
337342
Some(new_plan) => {
@@ -483,7 +488,7 @@ mod tests {
483488
use std::sync::{Arc, Mutex};
484489

485490
use datafusion_common::tree_node::Transformed;
486-
use datafusion_common::{plan_err, DFSchema, DFSchemaRef, Result};
491+
use datafusion_common::{plan_err, DFSchema, DFSchemaRef, DataFusionError, Result};
487492
use datafusion_expr::logical_plan::EmptyRelation;
488493
use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, Projection};
489494

@@ -667,12 +672,24 @@ mod tests {
667672
_: &LogicalPlan,
668673
_: &dyn OptimizerConfig,
669674
) -> Result<Option<LogicalPlan>> {
670-
plan_err!("rule failed")
675+
unreachable!()
671676
}
672677

673678
fn name(&self) -> &str {
674679
"bad rule"
675680
}
681+
682+
fn supports_rewrite(&self) -> bool {
683+
true
684+
}
685+
686+
fn rewrite(
687+
&self,
688+
_plan: LogicalPlan,
689+
_config: &dyn OptimizerConfig,
690+
) -> Result<Transformed<LogicalPlan>, DataFusionError> {
691+
plan_err!("rule failed")
692+
}
676693
}
677694

678695
/// Replaces whatever plan with a single table scan

datafusion/optimizer/src/utils.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ use log::{debug, trace};
3535
/// This also handles the case when the `plan` is a [`LogicalPlan::Explain`].
3636
///
3737
/// Returning `Ok(None)` indicates that the plan can't be optimized by the `optimizer`.
38+
#[deprecated(
39+
since = "40.0.0",
40+
note = "please use OptimizerRule::apply_order with ApplyOrder::BottomUp instead"
41+
)]
3842
pub fn optimize_children(
3943
optimizer: &impl OptimizerRule,
4044
plan: &LogicalPlan,
@@ -43,9 +47,16 @@ pub fn optimize_children(
4347
let mut new_inputs = Vec::with_capacity(plan.inputs().len());
4448
let mut plan_is_changed = false;
4549
for input in plan.inputs() {
46-
let new_input = optimizer.try_optimize(input, config)?;
47-
plan_is_changed = plan_is_changed || new_input.is_some();
48-
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
50+
if optimizer.supports_rewrite() {
51+
let new_input = optimizer.rewrite(input.clone(), config)?;
52+
plan_is_changed = plan_is_changed || new_input.transformed;
53+
new_inputs.push(new_input.data);
54+
} else {
55+
#[allow(deprecated)]
56+
let new_input = optimizer.try_optimize(input, config)?;
57+
plan_is_changed = plan_is_changed || new_input.is_some();
58+
new_inputs.push(new_input.unwrap_or_else(|| input.clone()))
59+
}
4960
}
5061
if plan_is_changed {
5162
let exprs = plan.expressions();

0 commit comments

Comments
 (0)