Skip to content

Commit 784df33

Browse files
authored
Move back schema not matching check and workaround (#15580)
* Add expression & input schema name consistency check back * Workaround * Handle edge case where original name contains colon * Add unit test
1 parent 5a6db9f commit 784df33

File tree

2 files changed

+73
-10
lines changed

2 files changed

+73
-10
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 70 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ use datafusion_expr::{
8181
WindowFrameBound, WriteOp,
8282
};
8383
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
84-
use datafusion_physical_expr::expressions::Literal;
84+
use datafusion_physical_expr::expressions::{Column, Literal};
8585
use datafusion_physical_expr::LexOrdering;
8686
use datafusion_physical_optimizer::PhysicalOptimizerRule;
8787
use datafusion_physical_plan::execution_plan::InvariantLevel;
@@ -2000,7 +2000,8 @@ impl DefaultPhysicalPlanner {
20002000
input: &Arc<LogicalPlan>,
20012001
expr: &[Expr],
20022002
) -> Result<Arc<dyn ExecutionPlan>> {
2003-
let input_schema = input.as_ref().schema();
2003+
let input_logical_schema = input.as_ref().schema();
2004+
let input_physical_schema = input_exec.schema();
20042005
let physical_exprs = expr
20052006
.iter()
20062007
.map(|e| {
@@ -2019,7 +2020,7 @@ impl DefaultPhysicalPlanner {
20192020
// This depends on the invariant that logical schema field index MUST match
20202021
// with physical schema field index.
20212022
let physical_name = if let Expr::Column(col) = e {
2022-
match input_schema.index_of_column(col) {
2023+
match input_logical_schema.index_of_column(col) {
20232024
Ok(idx) => {
20242025
// index physical field using logical field index
20252026
Ok(input_exec.schema().field(idx).name().to_string())
@@ -2032,10 +2033,14 @@ impl DefaultPhysicalPlanner {
20322033
physical_name(e)
20332034
};
20342035

2035-
tuple_err((
2036-
self.create_physical_expr(e, input_schema, session_state),
2037-
physical_name,
2038-
))
2036+
let physical_expr =
2037+
self.create_physical_expr(e, input_logical_schema, session_state);
2038+
2039+
// Check for possible column name mismatches
2040+
let final_physical_expr =
2041+
maybe_fix_physical_column_name(physical_expr, &input_physical_schema);
2042+
2043+
tuple_err((final_physical_expr, physical_name))
20392044
})
20402045
.collect::<Result<Vec<_>>>()?;
20412046

@@ -2055,6 +2060,40 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
20552060
}
20562061
}
20572062

2063+
// Handle the case where the name of a physical column expression does not match the corresponding physical input fields names.
2064+
// Physical column names are derived from the physical schema, whereas physical column expressions are derived from the logical column names.
2065+
//
2066+
// This is a special case that applies only to column expressions. Logical plans may slightly modify column names by appending a suffix (e.g., using ':'),
2067+
// to avoid duplicates—since DFSchemas do not allow duplicate names. For example: `count(Int64(1)):1`.
2068+
fn maybe_fix_physical_column_name(
2069+
expr: Result<Arc<dyn PhysicalExpr>>,
2070+
input_physical_schema: &SchemaRef,
2071+
) -> Result<Arc<dyn PhysicalExpr>> {
2072+
if let Ok(e) = &expr {
2073+
if let Some(column) = e.as_any().downcast_ref::<Column>() {
2074+
let physical_field = input_physical_schema.field(column.index());
2075+
let expr_col_name = column.name();
2076+
let physical_name = physical_field.name();
2077+
2078+
if physical_name != expr_col_name {
2079+
// handle edge cases where the physical_name contains ':'.
2080+
let colon_count = physical_name.matches(':').count();
2081+
let mut splits = expr_col_name.match_indices(':');
2082+
let split_pos = splits.nth(colon_count);
2083+
2084+
if let Some((idx, _)) = split_pos {
2085+
let base_name = &expr_col_name[..idx];
2086+
if base_name == physical_name {
2087+
let updated_column = Column::new(physical_name, column.index());
2088+
return Ok(Arc::new(updated_column));
2089+
}
2090+
}
2091+
}
2092+
}
2093+
}
2094+
expr
2095+
}
2096+
20582097
struct OptimizationInvariantChecker<'a> {
20592098
rule: &'a Arc<dyn PhysicalOptimizerRule + Send + Sync>,
20602099
}
@@ -2650,6 +2689,30 @@ mod tests {
26502689
}
26512690
}
26522691

2692+
#[tokio::test]
2693+
async fn test_maybe_fix_colon_in_physical_name() {
2694+
// The physical schema has a field name with a colon
2695+
let schema = Schema::new(vec![Field::new("metric:avg", DataType::Int32, false)]);
2696+
let schema_ref: SchemaRef = Arc::new(schema);
2697+
2698+
// What might happen after deduplication
2699+
let logical_col_name = "metric:avg:1";
2700+
let expr_with_suffix =
2701+
Arc::new(Column::new(logical_col_name, 0)) as Arc<dyn PhysicalExpr>;
2702+
let expr_result = Ok(expr_with_suffix);
2703+
2704+
// Call function under test
2705+
let fixed_expr =
2706+
maybe_fix_physical_column_name(expr_result, &schema_ref).unwrap();
2707+
2708+
// Downcast back to Column so we can check the name
2709+
let col = fixed_expr
2710+
.as_any()
2711+
.downcast_ref::<Column>()
2712+
.expect("Column");
2713+
2714+
assert_eq!(col.name(), "metric:avg");
2715+
}
26532716
struct ErrorExtensionPlanner {}
26542717

26552718
#[async_trait]

datafusion/physical-expr/src/equivalence/projection.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use crate::PhysicalExpr;
2222

2323
use arrow::datatypes::SchemaRef;
2424
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
25-
use datafusion_common::Result;
25+
use datafusion_common::{internal_err, Result};
2626

2727
/// Stores the mapping between source expressions and target expressions for a
2828
/// projection.
@@ -66,8 +66,8 @@ impl ProjectionMapping {
6666
let idx = col.index();
6767
let matching_input_field = input_schema.field(idx);
6868
if col.name() != matching_input_field.name() {
69-
let fixed_col = Column::new(col.name(), idx);
70-
return Ok(Transformed::yes(Arc::new(fixed_col)));
69+
return internal_err!("Input field name {} does not match with the projection expression {}",
70+
matching_input_field.name(),col.name())
7171
}
7272
let matching_input_column =
7373
Column::new(matching_input_field.name(), idx);

0 commit comments

Comments
 (0)