Skip to content

Commit 1356934

Browse files
authored
ExprBuilder for Physical Aggregate Expr (#11617)
* aggregate expr builder Signed-off-by: jayzhan211 <[email protected]> * replace parts of test Signed-off-by: jayzhan211 <[email protected]> * continue Signed-off-by: jayzhan211 <[email protected]> * cleanup all Signed-off-by: jayzhan211 <[email protected]> * clipp Signed-off-by: jayzhan211 <[email protected]> * add sort Signed-off-by: jayzhan211 <[email protected]> * rm field Signed-off-by: jayzhan211 <[email protected]> * address comment Signed-off-by: jayzhan211 <[email protected]> * fix import path Signed-off-by: jayzhan211 <[email protected]> --------- Signed-off-by: jayzhan211 <[email protected]>
1 parent e90b3ac commit 1356934

File tree

9 files changed

+369
-367
lines changed

9 files changed

+369
-367
lines changed

datafusion/core/src/lib.rs

+5
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,11 @@ pub mod optimizer {
545545
pub use datafusion_optimizer::*;
546546
}
547547

548+
/// re-export of [`datafusion_physical_expr`] crate
549+
pub mod physical_expr_common {
550+
pub use datafusion_physical_expr_common::*;
551+
}
552+
548553
/// re-export of [`datafusion_physical_expr`] crate
549554
pub mod physical_expr {
550555
pub use datafusion_physical_expr::*;

datafusion/core/src/physical_optimizer/aggregate_statistics.rs

+6-14
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ pub(crate) mod tests {
326326
use datafusion_functions_aggregate::count::count_udaf;
327327
use datafusion_physical_expr::expressions::cast;
328328
use datafusion_physical_expr::PhysicalExpr;
329-
use datafusion_physical_expr_common::aggregate::create_aggregate_expr;
329+
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
330330
use datafusion_physical_plan::aggregates::AggregateMode;
331331

332332
/// Mock data using a MemoryExec which has an exact count statistic
@@ -419,19 +419,11 @@ pub(crate) mod tests {
419419

420420
// Return appropriate expr depending if COUNT is for col or table (*)
421421
pub(crate) fn count_expr(&self, schema: &Schema) -> Arc<dyn AggregateExpr> {
422-
create_aggregate_expr(
423-
&count_udaf(),
424-
&[self.column()],
425-
&[],
426-
&[],
427-
&[],
428-
schema,
429-
self.column_name(),
430-
false,
431-
false,
432-
false,
433-
)
434-
.unwrap()
422+
AggregateExprBuilder::new(count_udaf(), vec![self.column()])
423+
.schema(Arc::new(schema.clone()))
424+
.name(self.column_name())
425+
.build()
426+
.unwrap()
435427
}
436428

437429
/// what argument would this aggregate need in the plan?

datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs

+14-27
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ mod tests {
177177
use datafusion_functions_aggregate::count::count_udaf;
178178
use datafusion_functions_aggregate::sum::sum_udaf;
179179
use datafusion_physical_expr::expressions::col;
180-
use datafusion_physical_plan::udaf::create_aggregate_expr;
180+
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
181181

182182
/// Runs the CombinePartialFinalAggregate optimizer and asserts the plan against the expected
183183
macro_rules! assert_optimized {
@@ -278,19 +278,11 @@ mod tests {
278278
name: &str,
279279
schema: &Schema,
280280
) -> Arc<dyn AggregateExpr> {
281-
create_aggregate_expr(
282-
&count_udaf(),
283-
&[expr],
284-
&[],
285-
&[],
286-
&[],
287-
schema,
288-
name,
289-
false,
290-
false,
291-
false,
292-
)
293-
.unwrap()
281+
AggregateExprBuilder::new(count_udaf(), vec![expr])
282+
.schema(Arc::new(schema.clone()))
283+
.name(name)
284+
.build()
285+
.unwrap()
294286
}
295287

296288
#[test]
@@ -368,19 +360,14 @@ mod tests {
368360
#[test]
369361
fn aggregations_with_group_combined() -> Result<()> {
370362
let schema = schema();
371-
372-
let aggr_expr = vec![create_aggregate_expr(
373-
&sum_udaf(),
374-
&[col("b", &schema)?],
375-
&[],
376-
&[],
377-
&[],
378-
&schema,
379-
"Sum(b)",
380-
false,
381-
false,
382-
false,
383-
)?];
363+
let aggr_expr =
364+
vec![
365+
AggregateExprBuilder::new(sum_udaf(), vec![col("b", &schema)?])
366+
.schema(Arc::clone(&schema))
367+
.name("Sum(b)")
368+
.build()
369+
.unwrap(),
370+
];
384371
let groups: Vec<(Arc<dyn PhysicalExpr>, String)> =
385372
vec![(col("c", &schema)?, "c".to_string())];
386373

datafusion/core/tests/fuzz_cases/aggregate_fuzz.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}
3535
use datafusion_functions_aggregate::sum::sum_udaf;
3636
use datafusion_physical_expr::expressions::col;
3737
use datafusion_physical_expr::PhysicalSortExpr;
38-
use datafusion_physical_plan::udaf::create_aggregate_expr;
38+
use datafusion_physical_expr_common::aggregate::AggregateExprBuilder;
3939
use datafusion_physical_plan::InputOrderMode;
4040
use test_utils::{add_empty_batches, StringBatchGenerator};
4141

@@ -103,19 +103,14 @@ async fn run_aggregate_test(input1: Vec<RecordBatch>, group_by_columns: Vec<&str
103103
.with_sort_information(vec![sort_keys]),
104104
);
105105

106-
let aggregate_expr = vec![create_aggregate_expr(
107-
&sum_udaf(),
108-
&[col("d", &schema).unwrap()],
109-
&[],
110-
&[],
111-
&[],
112-
&schema,
113-
"sum1",
114-
false,
115-
false,
116-
false,
117-
)
118-
.unwrap()];
106+
let aggregate_expr =
107+
vec![
108+
AggregateExprBuilder::new(sum_udaf(), vec![col("d", &schema).unwrap()])
109+
.schema(Arc::clone(&schema))
110+
.name("sum1")
111+
.build()
112+
.unwrap(),
113+
];
119114
let expr = group_by_columns
120115
.iter()
121116
.map(|elem| (col(elem, &schema).unwrap(), elem.to_string()))

0 commit comments

Comments
 (0)