Skip to content

Optionally create name of aggregate expression from expressions #11776

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ mod tests {
) -> Arc<dyn AggregateExpr> {
AggregateExprBuilder::new(count_udaf(), vec![expr])
.schema(Arc::new(schema.clone()))
.name(name)
.alias(name)
.build()
.unwrap()
}
Expand Down Expand Up @@ -364,7 +364,7 @@ mod tests {
vec![
AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
.schema(Arc::clone(&schema))
.name("Sum(b)")
.alias("Sum(b)")
.build()
.unwrap(),
];
Expand Down
272 changes: 5 additions & 267 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

use std::borrow::Cow;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;

use crate::datasource::file_format::file_type_to_format;
Expand Down Expand Up @@ -74,11 +73,9 @@ use datafusion_common::{
};
use datafusion_expr::dml::CopyTo;
use datafusion_expr::expr::{
self, AggregateFunction, Alias, Between, BinaryExpr, Cast, GroupingSet, InList, Like,
TryCast, WindowFunction,
self, physical_name, AggregateFunction, Alias, GroupingSet, WindowFunction,
};
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::expr_vec_fmt;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery, StringifiedPlan,
Expand All @@ -97,265 +94,6 @@ use log::{debug, trace};
use sqlparser::ast::NullTreatment;
use tokio::sync::Mutex;

fn create_function_physical_name(
fun: &str,
distinct: bool,
args: &[Expr],
order_by: Option<&Vec<Expr>>,
) -> Result<String> {
let names: Vec<String> = args
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<_>>()?;

let distinct_str = match distinct {
true => "DISTINCT ",
false => "",
};

let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));

Ok(order_by
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
.unwrap_or(phys_name))
}

fn physical_name(e: &Expr) -> Result<String> {
create_physical_name(e, true)
}

fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
match e {
Expr::Unnest(_) => {
internal_err!(
"Expr::Unnest should have been converted to LogicalPlan::Unnest"
)
}
Expr::Column(c) => {
if is_first_expr {
Ok(c.name.clone())
} else {
Ok(c.flat_name())
}
}
Expr::Alias(Alias { name, .. }) => Ok(name.clone()),
Expr::ScalarVariable(_, variable_names) => Ok(variable_names.join(".")),
Expr::Literal(value) => Ok(format!("{value:?}")),
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
let left = create_physical_name(left, false)?;
let right = create_physical_name(right, false)?;
Ok(format!("{left} {op} {right}"))
}
Expr::Case(case) => {
let mut name = "CASE ".to_string();
if let Some(e) = &case.expr {
let _ = write!(name, "{} ", create_physical_name(e, false)?);
}
for (w, t) in &case.when_then_expr {
let _ = write!(
name,
"WHEN {} THEN {} ",
create_physical_name(w, false)?,
create_physical_name(t, false)?
);
}
if let Some(e) = &case.else_expr {
let _ = write!(name, "ELSE {} ", create_physical_name(e, false)?);
}
name += "END";
Ok(name)
}
Expr::Cast(Cast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::TryCast(TryCast { expr, .. }) => {
// CAST does not change the expression name
create_physical_name(expr, false)
}
Expr::Not(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("NOT {expr}"))
}
Expr::Negative(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("(- {expr})"))
}
Expr::IsNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NULL"))
}
Expr::IsNotNull(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT NULL"))
}
Expr::IsTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS TRUE"))
}
Expr::IsFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS FALSE"))
}
Expr::IsUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS UNKNOWN"))
}
Expr::IsNotTrue(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT TRUE"))
}
Expr::IsNotFalse(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT FALSE"))
}
Expr::IsNotUnknown(expr) => {
let expr = create_physical_name(expr, false)?;
Ok(format!("{expr} IS NOT UNKNOWN"))
}
Expr::ScalarFunction(fun) => fun.func.display_name(&fun.args),
Expr::WindowFunction(WindowFunction {
fun,
args,
order_by,
..
}) => {
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
}
Expr::AggregateFunction(AggregateFunction {
func,
distinct,
args,
filter: _,
order_by,
null_treatment: _,
}) => {
create_function_physical_name(func.name(), *distinct, args, order_by.as_ref())
}
Expr::GroupingSet(grouping_set) => match grouping_set {
GroupingSet::Rollup(exprs) => Ok(format!(
"ROLLUP ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::Cube(exprs) => Ok(format!(
"CUBE ({})",
exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ")
)),
GroupingSet::GroupingSets(lists_of_exprs) => {
let mut strings = vec![];
for exprs in lists_of_exprs {
let exprs_str = exprs
.iter()
.map(|e| create_physical_name(e, false))
.collect::<Result<Vec<_>>>()?
.join(", ");
strings.push(format!("({exprs_str})"));
}
Ok(format!("GROUPING SETS ({})", strings.join(", ")))
}
},

Expr::InList(InList {
expr,
list,
negated,
}) => {
let expr = create_physical_name(expr, false)?;
let list = list.iter().map(|expr| create_physical_name(expr, false));
if *negated {
Ok(format!("{expr} NOT IN ({list:?})"))
} else {
Ok(format!("{expr} IN ({list:?})"))
}
}
Expr::Exists { .. } => {
not_impl_err!("EXISTS is not yet supported in the physical plan")
}
Expr::InSubquery(_) => {
not_impl_err!("IN subquery is not yet supported in the physical plan")
}
Expr::ScalarSubquery(_) => {
not_impl_err!("Scalar subqueries are not yet supported in the physical plan")
}
Expr::Between(Between {
expr,
negated,
low,
high,
}) => {
let expr = create_physical_name(expr, false)?;
let low = create_physical_name(low, false)?;
let high = create_physical_name(high, false)?;
if *negated {
Ok(format!("{expr} NOT BETWEEN {low} AND {high}"))
} else {
Ok(format!("{expr} BETWEEN {low} AND {high}"))
}
}
Expr::Like(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let op_name = if *case_insensitive { "ILIKE" } else { "LIKE" };
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT {op_name} {pattern}{escape}"))
} else {
Ok(format!("{expr} {op_name} {pattern}{escape}"))
}
}
Expr::SimilarTo(Like {
negated,
expr,
pattern,
escape_char,
case_insensitive: _,
}) => {
let expr = create_physical_name(expr, false)?;
let pattern = create_physical_name(pattern, false)?;
let escape = if let Some(char) = escape_char {
format!("CHAR '{char}'")
} else {
"".to_string()
};
if *negated {
Ok(format!("{expr} NOT SIMILAR TO {pattern}{escape}"))
} else {
Ok(format!("{expr} SIMILAR TO {pattern}{escape}"))
}
}
Expr::Sort { .. } => {
internal_err!("Create physical name does not support sort expression")
}
Expr::Wildcard { .. } => {
internal_err!("Create physical name does not support wildcard")
}
Expr::Placeholder(_) => {
internal_err!("Create physical name does not support placeholder")
}
Expr::OuterReferenceColumn(_, _) => {
internal_err!("Create physical name does not support OuterReferenceColumn")
}
}
}

/// Physical query planner that converts a `LogicalPlan` to an
/// `ExecutionPlan` suitable for execution.
#[async_trait]
Expand Down Expand Up @@ -1807,7 +1545,7 @@ type AggregateExprWithOptionalArgs = (
/// Create an aggregate expression with a name from a logical expression
pub fn create_aggregate_expr_with_name_and_maybe_filter(
e: &Expr,
name: impl Into<String>,
name: Option<String>,
logical_input_schema: &DFSchema,
_physical_input_schema: &Schema,
execution_props: &ExecutionProps,
Expand Down Expand Up @@ -1881,9 +1619,9 @@ pub fn create_aggregate_expr_and_maybe_filter(
) -> Result<AggregateExprWithOptionalArgs> {
// unpack (nested) aliased logical expressions, e.g. "sum(col) as total"
let (name, e) = match e {
Expr::Alias(Alias { expr, name, .. }) => (name.clone(), expr.as_ref()),
Expr::AggregateFunction(_) => (e.display_name().unwrap_or(physical_name(e)?), e),
_ => (physical_name(e)?, e),
Expr::Alias(Alias { expr, name, .. }) => (Some(name.clone()), expr.as_ref()),
Expr::AggregateFunction(_) => (e.display_name().ok(), e),
_ => (None, e),
};

create_aggregate_expr_with_name_and_maybe_filter(
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl TestAggregate {
pub fn count_expr(&self, schema: &Schema) -> Arc<dyn AggregateExpr> {
AggregateExprBuilder::new(count_udaf(), vec![self.column()])
.schema(Arc::new(schema.clone()))
.name(self.column_name())
.alias(self.column_name())
.build()
.unwrap()
}
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
vec![
AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()])
.schema(Arc::clone(&schema))
.name("sum1")
.alias("sum1")
.build()
.unwrap(),
];
Expand Down
Loading