Skip to content

Commit 7638a26

Browse files
authored
Introduce FunctionRegistry dependency to optimize and rewrite rule (#10714)
* mv function registry to expr Signed-off-by: jayzhan211 <[email protected]> * registry move to config trait Signed-off-by: jayzhan211 <[email protected]> * fix test Signed-off-by: jayzhan211 <[email protected]> * fix test Signed-off-by: jayzhan211 <[email protected]> * rm dependency Signed-off-by: jayzhan211 <[email protected]> * fix cli cargo lock Signed-off-by: jayzhan211 <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]>
1 parent d6ddd23 commit 7638a26

File tree

9 files changed

+147
-140
lines changed

9 files changed

+147
-140
lines changed

datafusion-cli/Cargo.lock

+79-81
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/core/src/execution/context/mod.rs

+4
Original file line numberDiff line numberDiff line change
@@ -2350,6 +2350,10 @@ impl OptimizerConfig for SessionState {
23502350
fn options(&self) -> &ConfigOptions {
23512351
self.config_options()
23522352
}
2353+
2354+
fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
2355+
Some(self)
2356+
}
23532357
}
23542358

23552359
/// Create a new task context instance from SessionContext

datafusion/execution/src/lib.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,16 @@ pub mod config;
2222
pub mod disk_manager;
2323
pub mod memory_pool;
2424
pub mod object_store;
25-
pub mod registry;
2625
pub mod runtime_env;
2726
mod stream;
2827
mod task;
2928

29+
pub mod registry {
30+
pub use datafusion_expr::registry::{
31+
FunctionRegistry, MemoryFunctionRegistry, SerializerRegistry,
32+
};
33+
}
34+
3035
pub use disk_manager::DiskManager;
3136
pub use registry::FunctionRegistry;
3237
pub use stream::{RecordBatchStream, SendableRecordBatchStream};

datafusion/expr/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ pub mod function;
4848
pub mod groups_accumulator;
4949
pub mod interval_arithmetic;
5050
pub mod logical_plan;
51+
pub mod registry;
5152
pub mod simplify;
5253
pub mod sort_properties;
5354
pub mod tree_node;

datafusion/execution/src/registry.rs renamed to datafusion/expr/src/registry.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
//! FunctionRegistry trait
1919
20+
use crate::expr_rewriter::FunctionRewrite;
21+
use crate::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
2022
use datafusion_common::{not_impl_err, plan_datafusion_err, Result};
21-
use datafusion_expr::expr_rewriter::FunctionRewrite;
22-
use datafusion_expr::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF};
2323
use std::collections::HashMap;
2424
use std::{collections::HashSet, sync::Arc};
2525

datafusion/optimizer/Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ async-trait = { workspace = true }
4545
chrono = { workspace = true }
4646
datafusion-common = { workspace = true, default-features = true }
4747
datafusion-expr = { workspace = true }
48-
datafusion-functions-aggregate = { workspace = true }
4948
datafusion-physical-expr = { workspace = true }
5049
hashbrown = { workspace = true }
5150
indexmap = { workspace = true }

datafusion/optimizer/src/optimizer.rs

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::collections::HashSet;
2121
use std::sync::Arc;
2222

2323
use chrono::{DateTime, Utc};
24+
use datafusion_expr::registry::FunctionRegistry;
2425
use log::{debug, warn};
2526

2627
use datafusion_common::alias::AliasGenerator;
@@ -122,6 +123,10 @@ pub trait OptimizerConfig {
122123
fn alias_generator(&self) -> Arc<AliasGenerator>;
123124

124125
fn options(&self) -> &ConfigOptions;
126+
127+
fn function_registry(&self) -> Option<&dyn FunctionRegistry> {
128+
None
129+
}
125130
}
126131

127132
/// A standalone [`OptimizerConfig`] that can be used independently

datafusion/optimizer/src/replace_distinct_aggregate.rs

+14-55
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ use crate::{OptimizerConfig, OptimizerRule};
2121

2222
use datafusion_common::tree_node::Transformed;
2323
use datafusion_common::{internal_err, Column, Result};
24+
use datafusion_expr::expr::AggregateFunction;
2425
use datafusion_expr::expr_rewriter::normalize_cols;
2526
use datafusion_expr::utils::expand_wildcard;
2627
use datafusion_expr::{col, LogicalPlanBuilder};
2728
use datafusion_expr::{Aggregate, Distinct, DistinctOn, Expr, LogicalPlan};
28-
use datafusion_functions_aggregate::first_last::first_value;
2929

3030
/// Optimizer that replaces logical [[Distinct]] with a logical [[Aggregate]]
3131
///
@@ -73,7 +73,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
7373
fn rewrite(
7474
&self,
7575
plan: LogicalPlan,
76-
_config: &dyn OptimizerConfig,
76+
config: &dyn OptimizerConfig,
7777
) -> Result<Transformed<LogicalPlan>> {
7878
match plan {
7979
LogicalPlan::Distinct(Distinct::All(input)) => {
@@ -95,9 +95,18 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
9595
let expr_cnt = on_expr.len();
9696

9797
// Construct the aggregation expression to be used to fetch the selected expressions.
98-
let aggr_expr = select_expr
99-
.into_iter()
100-
.map(|e| first_value(vec![e], false, None, sort_expr.clone(), None));
98+
let first_value_udaf =
99+
config.function_registry().unwrap().udaf("first_value")?;
100+
let aggr_expr = select_expr.into_iter().map(|e| {
101+
Expr::AggregateFunction(AggregateFunction::new_udf(
102+
first_value_udaf.clone(),
103+
vec![e],
104+
false,
105+
None,
106+
sort_expr.clone(),
107+
None,
108+
))
109+
});
101110

102111
let aggr_expr = normalize_cols(aggr_expr, input.as_ref())?;
103112
let group_expr = normalize_cols(on_expr, input.as_ref())?;
@@ -163,53 +172,3 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
163172
Some(BottomUp)
164173
}
165174
}
166-
167-
#[cfg(test)]
168-
mod tests {
169-
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
170-
use crate::test::{assert_optimized_plan_eq, test_table_scan};
171-
use datafusion_expr::{col, LogicalPlanBuilder};
172-
use std::sync::Arc;
173-
174-
#[test]
175-
fn replace_distinct() -> datafusion_common::Result<()> {
176-
let table_scan = test_table_scan().unwrap();
177-
let plan = LogicalPlanBuilder::from(table_scan)
178-
.project(vec![col("a"), col("b")])?
179-
.distinct()?
180-
.build()?;
181-
182-
let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\
183-
\n Projection: test.a, test.b\
184-
\n TableScan: test";
185-
186-
assert_optimized_plan_eq(
187-
Arc::new(ReplaceDistinctWithAggregate::new()),
188-
plan,
189-
expected,
190-
)
191-
}
192-
193-
#[test]
194-
fn replace_distinct_on() -> datafusion_common::Result<()> {
195-
let table_scan = test_table_scan().unwrap();
196-
let plan = LogicalPlanBuilder::from(table_scan)
197-
.distinct_on(
198-
vec![col("a")],
199-
vec![col("b")],
200-
Some(vec![col("a").sort(false, true), col("c").sort(true, false)]),
201-
)?
202-
.build()?;
203-
204-
let expected = "Projection: first_value(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST] AS b\
205-
\n Sort: test.a DESC NULLS FIRST\
206-
\n Aggregate: groupBy=[[test.a]], aggr=[[first_value(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST]]]\
207-
\n TableScan: test";
208-
209-
assert_optimized_plan_eq(
210-
Arc::new(ReplaceDistinctWithAggregate::new()),
211-
plan,
212-
expected,
213-
)
214-
}
215-
}

datafusion/sqllogictest/test_files/distinct_on.slt

+36
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,39 @@ LIMIT 3;
143143
-25 15295
144144
45 15673
145145
-72 -11122
146+
147+
# test distinct on
148+
statement ok
149+
create table t(a int, b int, c int) as values (1, 2, 3);
150+
151+
statement ok
152+
set datafusion.explain.logical_plan_only = true;
153+
154+
query TT
155+
explain select distinct on (a) b from t order by a desc, c;
156+
----
157+
logical_plan
158+
01)Projection: first_value(t.b) ORDER BY [t.a DESC NULLS FIRST, t.c ASC NULLS LAST] AS b
159+
02)--Sort: t.a DESC NULLS FIRST
160+
03)----Aggregate: groupBy=[[t.a]], aggr=[[first_value(t.b) ORDER BY [t.a DESC NULLS FIRST, t.c ASC NULLS LAST]]]
161+
04)------TableScan: t projection=[a, b, c]
162+
163+
statement ok
164+
drop table t;
165+
166+
# test distinct
167+
statement ok
168+
create table t(a int, b int) as values (1, 2);
169+
170+
statement ok
171+
set datafusion.explain.logical_plan_only = true;
172+
173+
query TT
174+
explain select distinct a, b from t;
175+
----
176+
logical_plan
177+
01)Aggregate: groupBy=[[t.a, t.b]], aggr=[[]]
178+
02)--TableScan: t projection=[a, b]
179+
180+
statement ok
181+
drop table t;

0 commit comments

Comments
 (0)