Skip to content

Commit acb7772

Browse files
committed
fix: issue apache#9130 substitute redundant columns when doing cross join
1 parent 441a435 commit acb7772

File tree

5 files changed

+59
-7
lines changed

5 files changed

+59
-7
lines changed

datafusion-cli/src/exec.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ async fn exec_and_print(
219219

220220
let statements = DFParser::parse_sql_with_dialect(&sql, dialect.as_ref())?;
221221
for statement in statements {
222+
// println!("cur statement is {:?}", statement);
222223
let plan = create_plan(ctx, statement).await?;
223224

224225
// For plans like `Explain` ignore `MaxRows` option and always display all rows
@@ -228,7 +229,7 @@ async fn exec_and_print(
228229
| LogicalPlan::DescribeTable(_)
229230
| LogicalPlan::Analyze(_)
230231
);
231-
232+
// println!("the final logical plan is {:?}", plan);
232233
let df = ctx.execute_logical_plan(plan).await?;
233234
let physical_plan = df.create_physical_plan().await?;
234235

datafusion/core/src/datasource/memory.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ impl MemTable {
7070
pub fn try_new(schema: SchemaRef, partitions: Vec<Vec<RecordBatch>>) -> Result<Self> {
7171
for batches in partitions.iter().flatten() {
7272
let batches_schema = batches.schema();
73+
println!(
74+
"the new schema is {:?}, schema set is {:?}",
75+
batches_schema, schema
76+
);
7377
if !schema.contains(&batches_schema) {
7478
debug!(
7579
"mem table schema does not contain batches schema. \

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use crate::{
4747
TableProviderFilterPushDown, TableSource, WriteOp,
4848
};
4949

50-
use arrow::datatypes::{DataType, Schema, SchemaRef};
50+
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5151
use datafusion_common::display::ToStringifiedPlan;
5252
use datafusion_common::{
5353
get_target_functional_dependencies, plan_datafusion_err, plan_err, Column, DFField,
@@ -890,6 +890,7 @@ impl LogicalPlanBuilder {
890890
pub fn cross_join(self, right: LogicalPlan) -> Result<Self> {
891891
let join_schema =
892892
build_join_schema(self.plan.schema(), right.schema(), &JoinType::Inner)?;
893+
println!("left is {:?} \n right is {:?}", self.plan, right);
893894
Ok(Self::from(LogicalPlan::CrossJoin(CrossJoin {
894895
left: Arc::new(self.plan),
895896
right: Arc::new(right),
@@ -1124,7 +1125,28 @@ impl LogicalPlanBuilder {
11241125
)?))
11251126
}
11261127
}
1127-
1128+
pub fn change_redundant_column(fields: Vec<DFField>) -> Vec<DFField> {
1129+
let mut name_map = HashMap::new();
1130+
fields
1131+
.into_iter()
1132+
.map(|field| {
1133+
if !name_map.contains_key(field.name()) {
1134+
name_map.insert(field.name().to_string(), 0);
1135+
field
1136+
} else {
1137+
let cur_cnt = &name_map.get(field.name());
1138+
let name = field.name().to_string() + ":" + &cur_cnt.unwrap().to_string();
1139+
name_map.insert(field.name().to_string(), cur_cnt.unwrap() + 1);
1140+
DFField::new(
1141+
field.qualifier().cloned(),
1142+
&name,
1143+
field.data_type().clone(),
1144+
field.is_nullable(),
1145+
)
1146+
}
1147+
})
1148+
.collect()
1149+
}
11281150
/// Creates a schema for a join operation.
11291151
/// The fields from the left side are first
11301152
pub fn build_join_schema(
@@ -1184,13 +1206,16 @@ pub fn build_join_schema(
11841206
right_fields.clone()
11851207
}
11861208
};
1209+
//println!("total fields is {:?}", fields);
11871210
let func_dependencies = left.functional_dependencies().join(
11881211
right.functional_dependencies(),
11891212
join_type,
11901213
left_fields.len(),
11911214
);
1215+
// println!("func_dependencies is {:?}", func_dependencies);
11921216
let mut metadata = left.metadata().clone();
11931217
metadata.extend(right.metadata().clone());
1218+
// let schema = DFSchema::new_with_metadata(change_redundant_column(fields), metadata)?;
11941219
let schema = DFSchema::new_with_metadata(fields, metadata)?;
11951220
schema.with_functional_dependencies(func_dependencies)
11961221
}
@@ -1231,12 +1256,16 @@ fn add_group_by_exprs_from_dependencies(
12311256
}
12321257
Ok(group_expr)
12331258
}
1259+
pub(crate) fn validate_unique_names_with_table<'a>() {
1260+
unimplemented!()
1261+
}
12341262
/// Errors if one or more expressions have equal names.
12351263
pub(crate) fn validate_unique_names<'a>(
12361264
node_name: &str,
12371265
expressions: impl IntoIterator<Item = &'a Expr>,
12381266
) -> Result<()> {
12391267
let mut unique_names = HashMap::new();
1268+
12401269
expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
12411270
let name = expr.display_name()?;
12421271
match unique_names.get(&name) {
@@ -1245,6 +1274,7 @@ pub(crate) fn validate_unique_names<'a>(
12451274
Ok(())
12461275
},
12471276
Some((existing_position, existing_expr)) => {
1277+
//println!("node_name is {}, existing expr is {:?}", node_name, existing_expr);
12481278
plan_err!("{node_name} require unique expression names \
12491279
but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
12501280
at position {position} have the same name. Consider aliasing (\"AS\") one of them."
@@ -1360,6 +1390,7 @@ pub fn project(
13601390
let mut projected_expr = vec![];
13611391
for e in expr {
13621392
let e = e.into();
1393+
//println!("cur_expression is {:?}", e);
13631394
match e {
13641395
Expr::Wildcard { qualifier: None } => {
13651396
projected_expr.extend(expand_wildcard(input_schema, &plan, None)?)
@@ -1375,6 +1406,10 @@ pub fn project(
13751406
.push(columnize_expr(normalize_col(e, &plan)?, input_schema)),
13761407
}
13771408
}
1409+
// println!(
1410+
// "before validation the projection name is {:?} \n and the expression is {:?}",
1411+
// plan, projected_expr
1412+
// );
13781413
validate_unique_names("Projections", projected_expr.iter())?;
13791414

13801415
Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use std::sync::Arc;
2424

2525
use super::dml::CopyTo;
2626
use super::DdlStatement;
27+
use crate::builder::change_redundant_column;
2728
use crate::dml::CopyOptions;
2829
use crate::expr::{
2930
Alias, Exists, InSubquery, Placeholder, Sort as SortExpr, WindowFunction,
@@ -1891,7 +1892,11 @@ impl SubqueryAlias {
18911892
alias: impl Into<OwnedTableReference>,
18921893
) -> Result<Self> {
18931894
let alias = alias.into();
1894-
let schema: Schema = plan.schema().as_ref().clone().into();
1895+
let fields = change_redundant_column(plan.schema().fields().clone());
1896+
let meta_data = plan.schema().as_ref().metadata().clone();
1897+
let schema: Schema = DFSchema::new_with_metadata(fields, meta_data)
1898+
.unwrap()
1899+
.into();
18951900
// Since schema is the same, other than qualifier, we can use existing
18961901
// functional dependencies:
18971902
let func_dependencies = plan.schema().functional_dependencies().clone();
@@ -2181,6 +2186,10 @@ impl TableScan {
21812186
df_schema.with_functional_dependencies(func_dependencies)
21822187
})?;
21832188
let projected_schema = Arc::new(projected_schema);
2189+
// println!(
2190+
// "projected_schema is {:?} \n and projection is {:?}",
2191+
// projected_schema, projection,
2192+
// );
21842193
Ok(Self {
21852194
table_name,
21862195
source: table_source,

datafusion/sql/src/select.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
6666
if !select.sort_by.is_empty() {
6767
return not_impl_err!("SORT BY");
6868
}
69-
69+
// println!("select from is {:?}", select.from);
70+
// println!("current planner_context is {:?}", planner_context);
7071
// process `from` clause
7172
let plan = self.plan_from_tables(select.from, planner_context)?;
7273
let empty_from = matches!(plan, LogicalPlan::EmptyRelation(_));
@@ -77,15 +78,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
7778
// handle named windows before processing the projection expression
7879
check_conflicting_windows(&select.named_window)?;
7980
match_window_definitions(&mut select.projection, &select.named_window)?;
80-
8181
// process the SELECT expressions, with wildcards expanded.
8282
let select_exprs = self.prepare_select_exprs(
8383
&base_plan,
8484
select.projection,
8585
empty_from,
8686
planner_context,
8787
)?;
88-
88+
// println!(
89+
// "base plan is {:?} \n select expression is {:?} \n planner_context is {:?}",
90+
// base_plan, select_exprs, planner_context
91+
// );
8992
// having and group by clause may reference aliases defined in select projection
9093
let projected_plan = self.project(base_plan.clone(), select_exprs.clone())?;
9194
// Place the fields of the base plan at the front so that when there are references

0 commit comments

Comments
 (0)