Skip to content

Commit f8dd65a

Browse files
committed
Simplify display format of AggregateFunctionExpr
1 parent 41e7aed commit f8dd65a

File tree

4 files changed

+133
-6
lines changed

4 files changed

+133
-6
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1588,6 +1588,7 @@ type AggregateExprWithOptionalArgs = (
15881588
pub fn create_aggregate_expr_with_name_and_maybe_filter(
15891589
e: &Expr,
15901590
name: Option<String>,
1591+
sql_name: String,
15911592
logical_input_schema: &DFSchema,
15921593
physical_input_schema: &Schema,
15931594
execution_props: &ExecutionProps,
@@ -1642,6 +1643,7 @@ pub fn create_aggregate_expr_with_name_and_maybe_filter(
16421643
.order_by(ordering_reqs)
16431644
.schema(Arc::new(physical_input_schema.to_owned()))
16441645
.alias(name)
1646+
.sql_name(sql_name)
16451647
.with_ignore_nulls(ignore_nulls)
16461648
.with_distinct(*distinct)
16471649
.build()
@@ -1664,15 +1666,22 @@ pub fn create_aggregate_expr_and_maybe_filter(
16641666
execution_props: &ExecutionProps,
16651667
) -> Result<AggregateExprWithOptionalArgs> {
16661668
// unpack (nested) aliased logical expressions, e.g. "sum(col) as total"
1667-
let (name, e) = match e {
1668-
Expr::Alias(Alias { expr, name, .. }) => (Some(name.clone()), expr.as_ref()),
1669-
Expr::AggregateFunction(_) => (Some(e.schema_name().to_string()), e),
1670-
_ => (None, e),
1669+
let (name, sql_name, e) = match e {
1670+
Expr::Alias(Alias { expr, name, .. }) => {
1671+
(Some(name.clone()), String::default(), expr.as_ref())
1672+
}
1673+
Expr::AggregateFunction(_) => (
1674+
Some(e.schema_name().to_string()),
1675+
e.sql_name().to_string(),
1676+
e,
1677+
),
1678+
_ => (None, String::default(), e),
16711679
};
16721680

16731681
create_aggregate_expr_with_name_and_maybe_filter(
16741682
e,
16751683
name,
1684+
sql_name,
16761685
logical_input_schema,
16771686
physical_input_schema,
16781687
execution_props,

datafusion/expr/src/expr.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1147,6 +1147,10 @@ impl Expr {
11471147
SchemaDisplay(self)
11481148
}
11491149

1150+
pub fn sql_name(&self) -> impl Display + '_ {
1151+
SqlDisplay(self)
1152+
}
1153+
11501154
/// Returns the qualifier and the schema name of this expression.
11511155
///
11521156
/// Used when the expression forms the output field of a certain plan.
@@ -2596,6 +2600,43 @@ impl Display for SchemaDisplay<'_> {
25962600
}
25972601
}
25982602

2603+
struct SqlDisplay<'a>(&'a Expr);
2604+
impl Display for SqlDisplay<'_> {
2605+
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
2606+
match self.0 {
2607+
Expr::Column(c) => {
2608+
write!(f, "{}", c.name)
2609+
}
2610+
Expr::Literal(_) => {
2611+
write!(f, "aa")
2612+
}
2613+
Expr::ScalarVariable(..) => {
2614+
write!(f, "bb")
2615+
}
2616+
Expr::OuterReferenceColumn(..) => {
2617+
write!(f, "cc")
2618+
}
2619+
Expr::Placeholder(_) => {
2620+
write!(f, "dd")
2621+
}
2622+
Expr::Wildcard { .. } => {
2623+
write!(f, "ee")
2624+
}
2625+
Expr::AggregateFunction(AggregateFunction { func, params }) => {
2626+
match func.sql_name(params) {
2627+
Ok(name) => {
2628+
write!(f, "{name}")
2629+
}
2630+
Err(e) => {
2631+
write!(f, "got error from schema_name {}", e)
2632+
}
2633+
}
2634+
}
2635+
_ => write!(f, "{}", self.0.schema_name()),
2636+
}
2637+
}
2638+
}
2639+
25992640
/// Get schema_name for Vector of expressions
26002641
///
26012642
/// Internal usage. Please call `schema_name_from_exprs` instead
@@ -2607,11 +2648,21 @@ pub(crate) fn schema_name_from_exprs_comma_separated_without_space(
26072648
schema_name_from_exprs_inner(exprs, ",")
26082649
}
26092650

2651+
pub(crate) fn sql_name_from_exprs_comma_separated_without_space(
2652+
exprs: &[Expr],
2653+
) -> Result<String, fmt::Error> {
2654+
sql_name_from_exprs_inner(exprs, ",")
2655+
}
2656+
26102657
/// Get schema_name for Vector of expressions
26112658
pub fn schema_name_from_exprs(exprs: &[Expr]) -> Result<String, fmt::Error> {
26122659
schema_name_from_exprs_inner(exprs, ", ")
26132660
}
26142661

2662+
pub fn sql_name_from_exprs(exprs: &[Expr]) -> Result<String, fmt::Error> {
2663+
sql_name_from_exprs_inner(exprs, ", ")
2664+
}
2665+
26152666
fn schema_name_from_exprs_inner(exprs: &[Expr], sep: &str) -> Result<String, fmt::Error> {
26162667
let mut s = String::new();
26172668
for (i, e) in exprs.iter().enumerate() {
@@ -2624,6 +2675,18 @@ fn schema_name_from_exprs_inner(exprs: &[Expr], sep: &str) -> Result<String, fmt
26242675
Ok(s)
26252676
}
26262677

2678+
fn sql_name_from_exprs_inner(exprs: &[Expr], sep: &str) -> Result<String, fmt::Error> {
2679+
let mut s = String::new();
2680+
for (i, e) in exprs.iter().enumerate() {
2681+
if i > 0 {
2682+
write!(&mut s, "{sep}")?;
2683+
}
2684+
write!(&mut s, "{}", SqlDisplay(e))?;
2685+
}
2686+
2687+
Ok(s)
2688+
}
2689+
26272690
pub fn schema_name_from_sorts(sorts: &[Sort]) -> Result<String, fmt::Error> {
26282691
let mut s = String::new();
26292692
for (i, e) in sorts.iter().enumerate() {

datafusion/expr/src/udaf.rs

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3131

3232
use crate::expr::{
3333
schema_name_from_exprs, schema_name_from_exprs_comma_separated_without_space,
34-
schema_name_from_sorts, AggregateFunction, AggregateFunctionParams,
35-
WindowFunctionParams,
34+
schema_name_from_sorts, sql_name_from_exprs_comma_separated_without_space,
35+
AggregateFunction, AggregateFunctionParams, WindowFunctionParams,
3636
};
3737
use crate::function::{
3838
AccumulatorArgs, AggregateFunctionSimplification, StateFieldsArgs,
@@ -175,6 +175,10 @@ impl AggregateUDF {
175175
self.inner.schema_name(params)
176176
}
177177

178+
pub fn sql_name(&self, params: &AggregateFunctionParams) -> Result<String> {
179+
self.inner.sql_name(params)
180+
}
181+
178182
pub fn window_function_schema_name(
179183
&self,
180184
params: &WindowFunctionParams,
@@ -452,6 +456,42 @@ pub trait AggregateUDFImpl: Debug + Send + Sync {
452456
Ok(schema_name)
453457
}
454458

459+
fn sql_name(&self, params: &AggregateFunctionParams) -> Result<String> {
460+
let AggregateFunctionParams {
461+
args,
462+
distinct,
463+
filter,
464+
order_by,
465+
null_treatment,
466+
} = params;
467+
468+
let mut schema_name = String::new();
469+
470+
schema_name.write_fmt(format_args!(
471+
"{}({}{})",
472+
self.name(),
473+
if *distinct { "DISTINCT " } else { "" },
474+
sql_name_from_exprs_comma_separated_without_space(args)?
475+
))?;
476+
477+
if let Some(null_treatment) = null_treatment {
478+
schema_name.write_fmt(format_args!(" {}", null_treatment))?;
479+
}
480+
481+
if let Some(filter) = filter {
482+
schema_name.write_fmt(format_args!(" FILTER (WHERE {filter})"))?;
483+
};
484+
485+
if let Some(order_by) = order_by {
486+
schema_name.write_fmt(format_args!(
487+
" ORDER BY [{}]",
488+
schema_name_from_sorts(order_by)?
489+
))?;
490+
};
491+
492+
Ok(schema_name)
493+
}
494+
455495
/// Returns the name of the column this expression would create
456496
///
457497
/// See [`Expr::schema_name`] for details

datafusion/physical-expr/src/aggregate.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub struct AggregateExprBuilder {
6565
/// Physical expressions of the aggregate function
6666
args: Vec<Arc<dyn PhysicalExpr>>,
6767
alias: Option<String>,
68+
sql_name: String,
6869
/// Arrow Schema for the aggregate function
6970
schema: SchemaRef,
7071
/// The physical order by expressions
@@ -83,6 +84,7 @@ impl AggregateExprBuilder {
8384
fun,
8485
args,
8586
alias: None,
87+
sql_name: String::default(),
8688
schema: Arc::new(Schema::empty()),
8789
ordering_req: LexOrdering::default(),
8890
ignore_nulls: false,
@@ -99,6 +101,7 @@ impl AggregateExprBuilder {
99101
fun,
100102
args,
101103
alias,
104+
sql_name,
102105
schema,
103106
ordering_req,
104107
ignore_nulls,
@@ -148,6 +151,7 @@ impl AggregateExprBuilder {
148151
args,
149152
data_type,
150153
name,
154+
sql_name,
151155
schema: Arc::unwrap_or_clone(schema),
152156
ordering_req,
153157
ignore_nulls,
@@ -164,6 +168,11 @@ impl AggregateExprBuilder {
164168
self
165169
}
166170

171+
pub fn sql_name(mut self, name: String) -> Self {
172+
self.sql_name = name;
173+
self
174+
}
175+
167176
pub fn schema(mut self, schema: SchemaRef) -> Self {
168177
self.schema = schema;
169178
self
@@ -215,6 +224,7 @@ pub struct AggregateFunctionExpr {
215224
/// Output / return type of this aggregate
216225
data_type: DataType,
217226
name: String,
227+
sql_name: String,
218228
schema: Schema,
219229
// The physical order by expressions
220230
ordering_req: LexOrdering,
@@ -245,6 +255,11 @@ impl AggregateFunctionExpr {
245255
&self.name
246256
}
247257

258+
/// Simplified name for `tree` explain.
259+
pub fn sql_name(&self) -> &str {
260+
&self.sql_name
261+
}
262+
248263
/// Return if the aggregation is distinct
249264
pub fn is_distinct(&self) -> bool {
250265
self.is_distinct

0 commit comments

Comments
 (0)