-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Introduce FunctionRegistry dependency to optimize and rewrite rule #10714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f379b73
62d59a2
9c42526
4fb9feb
4488294
97d8590
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,11 +21,11 @@ use crate::{OptimizerConfig, OptimizerRule}; | |
|
||
use datafusion_common::tree_node::Transformed; | ||
use datafusion_common::{internal_err, Column, Result}; | ||
use datafusion_expr::expr::AggregateFunction; | ||
use datafusion_expr::expr_rewriter::normalize_cols; | ||
use datafusion_expr::utils::expand_wildcard; | ||
use datafusion_expr::{col, LogicalPlanBuilder}; | ||
use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan}; | ||
use datafusion_functions_aggregate::first_last::first_value; | ||
|
||
/// Optimizer that replaces logical [[Distinct]] with a logical [[Aggregate]] | ||
/// | ||
|
@@ -73,7 +73,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { | |
fn rewrite( | ||
&self, | ||
plan: LogicalPlan, | ||
_config: &dyn OptimizerConfig, | ||
config: &dyn OptimizerConfig, | ||
) -> Result<Transformed<LogicalPlan>> { | ||
match plan { | ||
LogicalPlan::Distinct(Distinct::All(input)) => { | ||
|
@@ -95,9 +95,18 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { | |
let expr_cnt = on_expr.len(); | ||
|
||
// Construct the aggregation expression to be used to fetch the selected expressions. | ||
let aggr_expr = select_expr | ||
.into_iter() | ||
.map(|e| first_value(vec![e], false, None, sort_expr.clone(), None)); | ||
let first_value_udaf = | ||
config.function_registry().unwrap().udaf("first_value")?; | ||
let aggr_expr = select_expr.into_iter().map(|e| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this change is the reason for this PR I think -- to avoid the hard coded dependency on |
||
Expr::AggregateFunction(AggregateFunction::new_udf( | ||
first_value_udaf.clone(), | ||
vec![e], | ||
false, | ||
None, | ||
sort_expr.clone(), | ||
None, | ||
)) | ||
}); | ||
|
||
let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?; | ||
let group_expr = normalize_cols(on_expr, input.as_ref())?; | ||
|
@@ -163,53 +172,3 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { | |
Some(BottomUp) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. move to slt |
||
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; | ||
use crate::test::{assert_optimized_plan_eq, test_table_scan}; | ||
use datafusion_expr::{col, LogicalPlanBuilder}; | ||
use std::sync::Arc; | ||
|
||
#[test] | ||
fn replace_distinct() -> datafusion_common::Result<()> { | ||
let table_scan = test_table_scan().unwrap(); | ||
let plan = LogicalPlanBuilder::from(table_scan) | ||
.project(vec![col("a"), col("b")])? | ||
.distinct()? | ||
.build()?; | ||
|
||
let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\ | ||
\n Projection: test.a, test.b\ | ||
\n TableScan: test"; | ||
|
||
assert_optimized_plan_eq( | ||
Arc::new(ReplaceDistinctWithAggregate::new()), | ||
plan, | ||
expected, | ||
) | ||
} | ||
|
||
#[test] | ||
fn replace_distinct_on() -> datafusion_common::Result<()> { | ||
let table_scan = test_table_scan().unwrap(); | ||
let plan = LogicalPlanBuilder::from(table_scan) | ||
.distinct_on( | ||
vec![col("a")], | ||
vec![col("b")], | ||
Some(vec![col("a").sort(false, true), col("c").sort(true, false)]), | ||
)? | ||
.build()?; | ||
|
||
let expected = "Projection: first_value(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST] AS b\ | ||
\n Sort: test.a DESC NULLS FIRST\ | ||
\n Aggregate: groupBy=[[test.a]], aggr=[[first_value(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST]]]\ | ||
\n TableScan: test"; | ||
|
||
assert_optimized_plan_eq( | ||
Arc::new(ReplaceDistinctWithAggregate::new()), | ||
plan, | ||
expected, | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -143,3 +143,39 @@ LIMIT 3; | |
-25 15295 | ||
45 15673 | ||
-72 -11122 | ||
|
||
# test distinct on | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
statement ok | ||
create table t(a int, b int, c int) as values (1, 2, 3); | ||
|
||
statement ok | ||
set datafusion.explain.logical_plan_only = true; | ||
|
||
query TT | ||
explain select distinct on (a) b from t order by a desc, c; | ||
---- | ||
logical_plan | ||
01)Projection: first_value(t.b) ORDER BY [t.a DESC NULLS FIRST, t.c ASC NULLS LAST] AS b | ||
02)--Sort: t.a DESC NULLS FIRST | ||
03)----Aggregate: groupBy=[[t.a]], aggr=[[first_value(t.b) ORDER BY [t.a DESC NULLS FIRST, t.c ASC NULLS LAST]]] | ||
04)------TableScan: t projection=[a, b, c] | ||
|
||
statement ok | ||
drop table t; | ||
|
||
# test distinct | ||
statement ok | ||
create table t(a int, b int) as values (1, 2); | ||
|
||
statement ok | ||
set datafusion.explain.logical_plan_only = true; | ||
|
||
query TT | ||
explain select distinct a, b from t; | ||
---- | ||
logical_plan | ||
01)Aggregate: groupBy=[[t.a, t.b]], aggr=[[]] | ||
02)--TableScan: t projection=[a, b] | ||
|
||
statement ok | ||
drop table t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🎉