Skip to content

fix: issue #9130 substitute redundant columns when doing cross join #9154

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Feb 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datafusion-cli/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ async fn exec_and_print(
| LogicalPlan::DescribeTable(_)
| LogicalPlan::Analyze(_)
);

let df = ctx.execute_logical_plan(plan).await?;
let physical_plan = df.create_physical_plan().await?;

Expand Down
47 changes: 46 additions & 1 deletion datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,27 @@ impl LogicalPlanBuilder {
)?))
}
}

pub fn change_redundant_column(fields: Vec<DFField>) -> Vec<DFField> {
let mut name_map = HashMap::new();
fields
.into_iter()
.map(|field| {
let counter = name_map.entry(field.name().to_string()).or_insert(0);
*counter += 1;
if *counter > 1 {
let new_name = format!("{}:{}", field.name(), *counter - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This effectively renames some of the columns to something else (like a:1) I think

How do we:

  1. know that name is unique? (couldn't there be another table in the query that has a column named a:1?
  2. know we should resolve the namespace conflict with the first column with the name?

DFField::new(
field.qualifier().cloned(),
&new_name,
field.data_type().clone(),
field.is_nullable(),
)
} else {
field
}
})
.collect()
}
/// Creates a schema for a join operation.
/// The fields from the left side are first
pub fn build_join_schema(
Expand Down Expand Up @@ -1237,6 +1257,7 @@ pub(crate) fn validate_unique_names<'a>(
expressions: impl IntoIterator<Item = &'a Expr>,
) -> Result<()> {
let mut unique_names = HashMap::new();

expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
let name = expr.display_name()?;
match unique_names.get(&name) {
Expand Down Expand Up @@ -1375,6 +1396,7 @@ pub fn project(
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
}
}

validate_unique_names("Projections", projected_expr.iter())?;

Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
Expand Down Expand Up @@ -2076,4 +2098,27 @@ mod tests {

Ok(())
}
#[test]
fn test_change_redundant_column() -> Result<()> {
let t1_field_1 = DFField::new_unqualified("a", DataType::Int32, false);
let t2_field_1 = DFField::new_unqualified("a", DataType::Int32, false);
let t2_field_3 = DFField::new_unqualified("a", DataType::Int32, false);
let t1_field_2 = DFField::new_unqualified("b", DataType::Int32, false);
let t2_field_2 = DFField::new_unqualified("b", DataType::Int32, false);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to add one more field named "a" here (to show a counter other than :1)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I can do that

let field_vec = vec![t1_field_1, t2_field_1, t1_field_2, t2_field_2, t2_field_3];
let remove_redundant = change_redundant_column(field_vec);

assert_eq!(
remove_redundant,
vec![
DFField::new_unqualified("a", DataType::Int32, false),
DFField::new_unqualified("a:1", DataType::Int32, false),
DFField::new_unqualified("b", DataType::Int32, false),
DFField::new_unqualified("b:1", DataType::Int32, false),
DFField::new_unqualified("a:2", DataType::Int32, false),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

]
);
Ok(())
}
}
6 changes: 5 additions & 1 deletion datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::Arc;

use super::dml::CopyTo;
use super::DdlStatement;
use crate::builder::change_redundant_column;
use crate::dml::CopyOptions;
use crate::expr::{
Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
Expand Down Expand Up @@ -1891,7 +1892,9 @@ impl SubqueryAlias {
alias: impl Into<OwnedTableReference>,
) -> Result<Self> {
let alias = alias.into();
let schema: Schema = plan.schema().as_ref().clone().into();
let fields = change_redundant_column(plan.schema().fields().clone());
let meta_data = plan.schema().as_ref().metadata().clone();
let schema: Schema = DFSchema::new_with_metadata(fields, meta_data)?.into();
// Since schema is the same, other than qualifier, we can use existing
// functional dependencies:
let func_dependencies = plan.schema().functional_dependencies().clone();
Expand Down Expand Up @@ -2181,6 +2184,7 @@ impl TableScan {
df_schema.with_functional_dependencies(func_dependencies)
})?;
let projected_schema = Arc::new(projected_schema);

Ok(Self {
table_name,
source: table_source,
Expand Down
1 change: 0 additions & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// handle named windows before processing the projection expression
check_conflicting_windows(&select.named_window)?;
match_window_definitions(&mut select.projection, &select.named_window)?;

// process the SELECT expressions, with wildcards expanded.
let select_exprs = self.prepare_select_exprs(
&base_plan,
Expand Down
64 changes: 64 additions & 0 deletions datafusion/sqllogictest/test_files/same_column_name_cross_join.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at

# http://www.apache.org/licenses/LICENSE-2.0

# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


# prepare the tables
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran this test without the code in this PR and it fails like this, so I think the test covers the changes in the PR

External error: query failed: DataFusion error: Error during planning: Projections require unique expression names but the expression "t.a" at position 0 and "t.a" at position 2 have the same name. Consider aliasing ("AS") one of them.
[SQL] select * from (t1 cross join t2) as t cross join t3;
-------
at test_files/./join.slt:733
at test_files/join_disable_repartition_joins.slt:26

Error: Execution("1 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

Caused by:
  process didn't exit successfully: `/Users/andrewlamb/Software/arrow-datafusion/target/debug/deps/sqllogictests-6e501f6e871dacfc joins` (exit status: 1)


statement ok
create table t1 (a int, b int);

statement ok
create table t2 (a int, b int);

statement ok
create table t3 (a int, b int);

statement ok
insert into t1 values (1, 2);

statement ok
insert into t2 values (3, 4);

statement ok
insert into t3 values (5, 6);

query IIIIII
select * from (t1 cross join t2) as t cross join t3;
-------
----
1 2 3 4 5 6



query IIIIIIII
select * from (t1 cross join t2) as t cross join (t2 cross join t3)
-------
----
1 2 3 4 3 4 5 6


query IIIIIIIIIIII
select * from (t1 cross join t2) as t cross join (t2 cross join t3) cross join (t1 cross join t3) as tt
--------
----
1 2 3 4 3 4 5 6 1 2 5 6

query IIIIIIIIIIIIIIII
select * from (t1 cross join t2) as t cross join (t2 cross join t3) cross join (t1 cross join t3) as tt cross join (t2 cross join t3) as ttt;
--------
----
1 2 3 4 3 4 5 6 1 2 5 6 3 4 5 6