Skip to content

Commit 63f113e

Browse files
berkaysynnadaccciudatu
authored andcommitted
Projection Expression - Input Field Inconsistencies during Projection (apache#10088)
* agg fixes * test updates * fixing count mismatch * Update aggregate_statistics.rs * catch different names * minor
1 parent 6b2e999 commit 63f113e

File tree

10 files changed

+145
-82
lines changed

10 files changed

+145
-82
lines changed

datafusion/core/src/physical_optimizer/aggregate_statistics.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@ use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
3535
#[derive(Default)]
3636
pub struct AggregateStatistics {}
3737

38-
/// The name of the column corresponding to [`COUNT_STAR_EXPANSION`]
39-
const COUNT_STAR_NAME: &str = "COUNT(*)";
40-
4138
impl AggregateStatistics {
4239
#[allow(missing_docs)]
4340
pub fn new() -> Self {
@@ -144,7 +141,7 @@ fn take_optimizable(node: &dyn ExecutionPlan) -> Option<Arc<dyn ExecutionPlan>>
144141
fn take_optimizable_table_count(
145142
agg_expr: &dyn AggregateExpr,
146143
stats: &Statistics,
147-
) -> Option<(ScalarValue, &'static str)> {
144+
) -> Option<(ScalarValue, String)> {
148145
if let (&Precision::Exact(num_rows), Some(casted_expr)) = (
149146
&stats.num_rows,
150147
agg_expr.as_any().downcast_ref::<expressions::Count>(),
@@ -158,7 +155,7 @@ fn take_optimizable_table_count(
158155
if lit_expr.value() == &COUNT_STAR_EXPANSION {
159156
return Some((
160157
ScalarValue::Int64(Some(num_rows as i64)),
161-
COUNT_STAR_NAME,
158+
casted_expr.name().to_owned(),
162159
));
163160
}
164161
}
@@ -427,7 +424,7 @@ pub(crate) mod tests {
427424
/// What name would this aggregate produce in a plan?
428425
fn column_name(&self) -> &'static str {
429426
match self {
430-
Self::CountStar => COUNT_STAR_NAME,
427+
Self::CountStar => "COUNT(*)",
431428
Self::ColumnA(_) => "COUNT(a)",
432429
}
433430
}

datafusion/core/src/physical_planner.rs

+23-8
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ use datafusion_expr::expr::{
8787
WindowFunction,
8888
};
8989
use datafusion_expr::expr_rewriter::unnormalize_cols;
90+
use datafusion_expr::expr_vec_fmt;
9091
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
9192
use datafusion_expr::{
9293
DescribeTable, DmlStatement, Extension, Filter, RecursiveQuery,
@@ -108,6 +109,7 @@ fn create_function_physical_name(
108109
fun: &str,
109110
distinct: bool,
110111
args: &[Expr],
112+
order_by: Option<&Vec<Expr>>,
111113
) -> Result<String> {
112114
let names: Vec<String> = args
113115
.iter()
@@ -118,7 +120,12 @@ fn create_function_physical_name(
118120
true => "DISTINCT ",
119121
false => "",
120122
};
121-
Ok(format!("{}({}{})", fun, distinct_str, names.join(",")))
123+
124+
let phys_name = format!("{}({}{})", fun, distinct_str, names.join(","));
125+
126+
Ok(order_by
127+
.map(|order_by| format!("{} ORDER BY [{}]", phys_name, expr_vec_fmt!(order_by)))
128+
.unwrap_or(phys_name))
122129
}
123130

124131
fn physical_name(e: &Expr) -> Result<String> {
@@ -238,22 +245,30 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
238245
return internal_err!("Function `Expr` with name should be resolved.");
239246
}
240247

241-
create_function_physical_name(fun.name(), false, &fun.args)
248+
create_function_physical_name(fun.name(), false, &fun.args, None)
242249
}
243-
Expr::WindowFunction(WindowFunction { fun, args, .. }) => {
244-
create_function_physical_name(&fun.to_string(), false, args)
250+
Expr::WindowFunction(WindowFunction {
251+
fun,
252+
args,
253+
order_by,
254+
..
255+
}) => {
256+
create_function_physical_name(&fun.to_string(), false, args, Some(order_by))
245257
}
246258
Expr::AggregateFunction(AggregateFunction {
247259
func_def,
248260
distinct,
249261
args,
250262
filter,
251-
order_by: _,
263+
order_by,
252264
null_treatment: _,
253265
}) => match func_def {
254-
AggregateFunctionDefinition::BuiltIn(..) => {
255-
create_function_physical_name(func_def.name(), *distinct, args)
256-
}
266+
AggregateFunctionDefinition::BuiltIn(..) => create_function_physical_name(
267+
func_def.name(),
268+
*distinct,
269+
args,
270+
order_by.as_ref(),
271+
),
257272
AggregateFunctionDefinition::UDF(fun) => {
258273
// TODO: Add support for filter by in AggregateUDF
259274
if filter.is_some() {

datafusion/functions-aggregate/src/first_last.rs

+31-10
Original file line numberDiff line numberDiff line change
@@ -415,11 +415,9 @@ impl FirstValuePhysicalExpr {
415415
}
416416

417417
pub fn convert_to_last(self) -> LastValuePhysicalExpr {
418-
let name = if self.name.starts_with("FIRST") {
419-
format!("LAST{}", &self.name[5..])
420-
} else {
421-
format!("LAST_VALUE({})", self.expr)
422-
};
418+
let mut name = format!("LAST{}", &self.name[5..]);
419+
replace_order_by_clause(&mut name);
420+
423421
let FirstValuePhysicalExpr {
424422
expr,
425423
input_data_type,
@@ -593,11 +591,9 @@ impl LastValuePhysicalExpr {
593591
}
594592

595593
pub fn convert_to_first(self) -> FirstValuePhysicalExpr {
596-
let name = if self.name.starts_with("LAST") {
597-
format!("FIRST{}", &self.name[4..])
598-
} else {
599-
format!("FIRST_VALUE({})", self.expr)
600-
};
594+
let mut name = format!("FIRST{}", &self.name[4..]);
595+
replace_order_by_clause(&mut name);
596+
601597
let LastValuePhysicalExpr {
602598
expr,
603599
input_data_type,
@@ -905,6 +901,31 @@ fn convert_to_sort_cols(
905901
.collect::<Vec<_>>()
906902
}
907903

904+
fn replace_order_by_clause(order_by: &mut String) {
905+
let suffixes = [
906+
(" DESC NULLS FIRST]", " ASC NULLS LAST]"),
907+
(" ASC NULLS FIRST]", " DESC NULLS LAST]"),
908+
(" DESC NULLS LAST]", " ASC NULLS FIRST]"),
909+
(" ASC NULLS LAST]", " DESC NULLS FIRST]"),
910+
];
911+
912+
if let Some(start) = order_by.find("ORDER BY [") {
913+
if let Some(end) = order_by[start..].find(']') {
914+
let order_by_start = start + 9;
915+
let order_by_end = start + end;
916+
917+
let column_order = &order_by[order_by_start..=order_by_end];
918+
for &(suffix, replacement) in &suffixes {
919+
if column_order.ends_with(suffix) {
920+
let new_order = column_order.replace(suffix, replacement);
921+
order_by.replace_range(order_by_start..=order_by_end, &new_order);
922+
break;
923+
}
924+
}
925+
}
926+
}
927+
}
928+
908929
#[cfg(test)]
909930
mod tests {
910931
use arrow::array::Int64Array;

datafusion/physical-expr/src/equivalence/projection.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use std::sync::Arc;
2020
use arrow::datatypes::SchemaRef;
2121

2222
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
23-
use datafusion_common::Result;
23+
use datafusion_common::{internal_err, Result};
2424

2525
use crate::expressions::Column;
2626
use crate::PhysicalExpr;
@@ -67,6 +67,10 @@ impl ProjectionMapping {
6767
// Conceptually, `source_expr` and `expression` should be the same.
6868
let idx = col.index();
6969
let matching_input_field = input_schema.field(idx);
70+
if col.name() != matching_input_field.name() {
71+
return internal_err!("Input field name {} does not match with the projection expression {}",
72+
matching_input_field.name(),col.name())
73+
}
7074
let matching_input_column =
7175
Column::new(matching_input_field.name(), idx);
7276
Ok(Transformed::yes(Arc::new(matching_input_column)))

datafusion/sqllogictest/test_files/agg_func_substitute.slt

+4-4
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ logical_plan
4444
03)----TableScan: multiple_ordered_table projection=[a, c]
4545
physical_plan
4646
01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
47-
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
47+
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
4848
03)----SortExec: expr=[a@0 ASC NULLS LAST]
4949
04)------CoalesceBatchesExec: target_batch_size=8192
5050
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
51-
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
51+
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
5252
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
5353
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
5454

@@ -64,11 +64,11 @@ logical_plan
6464
03)----TableScan: multiple_ordered_table projection=[a, c]
6565
physical_plan
6666
01)ProjectionExec: expr=[a@0 as a, NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]@1 as result]
67-
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
67+
02)--AggregateExec: mode=FinalPartitioned, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
6868
03)----SortExec: expr=[a@0 ASC NULLS LAST]
6969
04)------CoalesceBatchesExec: target_batch_size=8192
7070
05)--------RepartitionExec: partitioning=Hash([a@0], 4), input_partitions=4
71-
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1))], ordering_mode=Sorted
71+
06)----------AggregateExec: mode=Partial, gby=[a@0 as a], aggr=[NTH_VALUE(multiple_ordered_table.c,Int64(1)) ORDER BY [multiple_ordered_table.c ASC NULLS LAST]], ordering_mode=Sorted
7272
07)------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
7373
08)--------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], has_header=true
7474

datafusion/sqllogictest/test_files/aggregate.slt

+6-6
Original file line numberDiff line numberDiff line change
@@ -132,9 +132,9 @@ logical_plan
132132
01)Aggregate: groupBy=[[]], aggr=[[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]]
133133
02)--TableScan: agg_order projection=[c1, c2, c3]
134134
physical_plan
135-
01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
135+
01)AggregateExec: mode=Final, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
136136
02)--CoalescePartitionsExec
137-
03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1)]
137+
03)----AggregateExec: mode=Partial, gby=[], aggr=[ARRAY_AGG(agg_order.c1) ORDER BY [agg_order.c2 DESC NULLS FIRST, agg_order.c3 ASC NULLS LAST]]
138138
04)------SortExec: expr=[c2@1 DESC,c3@2 ASC NULLS LAST]
139139
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
140140
06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/aggregate_agg_multi_order.csv]]}, projection=[c1, c2, c3], has_header=true
@@ -3520,9 +3520,9 @@ logical_plan
35203520
01)Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]]
35213521
02)--TableScan: convert_first_last_table projection=[c1, c3]
35223522
physical_plan
3523-
01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)]
3523+
01)AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 DESC NULLS FIRST]]
35243524
02)--CoalescePartitionsExec
3525-
03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)]
3525+
03)----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c3 ASC NULLS LAST]]
35263526
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
35273527
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c3], output_orderings=[[c1@0 ASC NULLS LAST], [c3@1 ASC NULLS LAST]], has_header=true
35283528

@@ -3534,8 +3534,8 @@ logical_plan
35343534
01)Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]]
35353535
02)--TableScan: convert_first_last_table projection=[c1, c2]
35363536
physical_plan
3537-
01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1)]
3537+
01)AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 ASC NULLS LAST]]
35383538
02)--CoalescePartitionsExec
3539-
03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1)]
3539+
03)----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(convert_first_last_table.c1) ORDER BY [convert_first_last_table.c2 DESC NULLS FIRST]]
35403540
04)------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
35413541
05)--------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/convert_first_last.csv]]}, projection=[c1, c2], output_orderings=[[c1@0 ASC NULLS LAST], [c2@1 DESC]], has_header=true

datafusion/sqllogictest/test_files/distinct_on.slt

+2-2
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,10 @@ physical_plan
9797
01)ProjectionExec: expr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@1 as c3, FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]@2 as c2]
9898
02)--SortPreservingMergeExec: [c1@0 ASC NULLS LAST]
9999
03)----SortExec: expr=[c1@0 ASC NULLS LAST]
100-
04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
100+
04)------AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
101101
05)--------CoalesceBatchesExec: target_batch_size=8192
102102
06)----------RepartitionExec: partitioning=Hash([c1@0], 4), input_partitions=4
103-
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3), FIRST_VALUE(aggregate_test_100.c2)]
103+
07)------------AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[FIRST_VALUE(aggregate_test_100.c3) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST], FIRST_VALUE(aggregate_test_100.c2) ORDER BY [aggregate_test_100.c1 ASC NULLS LAST, aggregate_test_100.c3 ASC NULLS LAST]]
104104
08)--------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
105105
09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3], has_header=true
106106

0 commit comments

Comments
 (0)