Skip to content

ExprBuilder for Physical Aggregate Expr #11617

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,11 @@ pub mod optimizer {
pub use datafusion_optimizer::*;
}

/// re-export of [`datafusion_physical_expr`] crate
pub mod physical_expr_common {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems other crate import *, follow them

pub use datafusion_physical_expr_common::*;
}

/// re-export of [`datafusion_physical_expr`] crate
pub mod physical_expr {
pub use datafusion_physical_expr::*;
Expand Down
20 changes: 6 additions & 14 deletions datafusion/core/src/physical_optimizer/aggregate_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ pub(crate) mod tests {
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_physical_expr::expressions::cast;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use datafusion_physical_plan::aggregates::AggregateMode;

/// Mock data using a MemoryExec which has an exact count statistic
Expand Down Expand Up @@ -419,19 +419,11 @@ pub(crate) mod tests {

// Return appropriate expr depending if COUNT is for col or table (*)
pub(crate) fn count_expr(&self, schema: &Schema) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[self.column()],
&[],
&[],
&[],
schema,
self.column_name(),
false,
false,
false,
)
.unwrap()
AggregateExprBuilder::new(count_udaf(), vec![self.column()])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that is much easier to understand 💯

.schema(Arc::new(schema.clone()))
.name(self.column_name())
.build()
.unwrap()
}

/// what argument would this aggregate need in the plan?
Expand Down
41 changes: 14 additions & 27 deletions datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ mod tests {
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_plan::udaf::create_aggregate_expr;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;

/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
macro_rules! assert_optimized {
Expand Down Expand Up @@ -278,19 +278,11 @@ mod tests {
name: &str,
schema: &Schema,
) -> Arc<dyn AggregateExpr> {
create_aggregate_expr(
&count_udaf(),
&[expr],
&[],
&[],
&[],
schema,
name,
false,
false,
false,
)
.unwrap()
AggregateExprBuilder::new(count_udaf(), vec![expr])
.schema(Arc::new(schema.clone()))
.name(name)
.build()
.unwrap()
}

#[test]
Expand Down Expand Up @@ -368,19 +360,14 @@ mod tests {
#[test]
fn aggregations_with_group_combined() -> Result<()> {
let schema = schema();

let aggr_expr = vec![create_aggregate_expr(
&sum_udaf(),
&[col("b", &schema)?],
&[],
&[],
&[],
&schema,
"Sum(b)",
false,
false,
false,
)?];
let aggr_expr =
vec![
AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
.schema(Arc::clone(&schema))
.name("Sum(b)")
.build()
.unwrap(),
];
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
vec![(col("c", &schema)?, "c".to_string())];

Expand Down
23 changes: 9 additions & 14 deletions datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}
use datafusion_functions_aggregate::sum::sum_udaf;
use datafusion_physical_expr::expressions::col;
use datafusion_physical_expr::PhysicalSortExpr;
use datafusion_physical_plan::udaf::create_aggregate_expr;
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
use datafusion_physical_plan::InputOrderMode;
use test_utils::{add_empty_batches, StringBatchGenerator};

Expand Down Expand Up @@ -103,19 +103,14 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
.with_sort_information(vec![sort_keys]),
);

let aggregate_expr = vec![create_aggregate_expr(
&sum_udaf(),
&[col("d", &schema).unwrap()],
&[],
&[],
&[],
&schema,
"sum1",
false,
false,
false,
)
.unwrap()];
let aggregate_expr =
vec![
AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()])
.schema(Arc::clone(&schema))
.name("sum1")
.build()
.unwrap(),
];
let expr = group_by_columns
.iter()
.map(|elem| (col(elem, &schema).unwrap(), elem.to_string()))
Expand Down
Loading