Skip to content

Commit 9ea86ca

Browse files
committed
Support duplicate column aliases in queries
In SQL, selecting single column multiple times is legal and most modern databases support this. This commit adds such support to DataFusion too.
1 parent 168c696 commit 9ea86ca

File tree

14 files changed

+272
-110
lines changed

14 files changed

+272
-110
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 11 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
//! DFSchema is an extended schema struct that DataFusion uses to provide support for
1919
//! fields with optional relation names.
2020
21-
use std::collections::{BTreeSet, HashMap, HashSet};
21+
use std::collections::{HashMap, HashSet};
2222
use std::fmt::{Display, Formatter};
2323
use std::hash::Hash;
2424
use std::sync::Arc;
@@ -154,7 +154,6 @@ impl DFSchema {
154154
field_qualifiers: qualifiers,
155155
functional_dependencies: FunctionalDependencies::empty(),
156156
};
157-
dfschema.check_names()?;
158157
Ok(dfschema)
159158
}
160159

@@ -183,7 +182,6 @@ impl DFSchema {
183182
field_qualifiers: vec![None; field_count],
184183
functional_dependencies: FunctionalDependencies::empty(),
185184
};
186-
dfschema.check_names()?;
187185
Ok(dfschema)
188186
}
189187

@@ -201,7 +199,6 @@ impl DFSchema {
201199
field_qualifiers: vec![Some(qualifier); schema.fields.len()],
202200
functional_dependencies: FunctionalDependencies::empty(),
203201
};
204-
schema.check_names()?;
205202
Ok(schema)
206203
}
207204

@@ -215,40 +212,9 @@ impl DFSchema {
215212
field_qualifiers: qualifiers,
216213
functional_dependencies: FunctionalDependencies::empty(),
217214
};
218-
dfschema.check_names()?;
219215
Ok(dfschema)
220216
}
221217

222-
/// Check if the schema have some fields with the same name
223-
pub fn check_names(&self) -> Result<()> {
224-
let mut qualified_names = BTreeSet::new();
225-
let mut unqualified_names = BTreeSet::new();
226-
227-
for (field, qualifier) in self.inner.fields().iter().zip(&self.field_qualifiers) {
228-
if let Some(qualifier) = qualifier {
229-
if !qualified_names.insert((qualifier, field.name())) {
230-
return _schema_err!(SchemaError::DuplicateQualifiedField {
231-
qualifier: Box::new(qualifier.clone()),
232-
name: field.name().to_string(),
233-
});
234-
}
235-
} else if !unqualified_names.insert(field.name()) {
236-
return _schema_err!(SchemaError::DuplicateUnqualifiedField {
237-
name: field.name().to_string()
238-
});
239-
}
240-
}
241-
242-
for (qualifier, name) in qualified_names {
243-
if unqualified_names.contains(name) {
244-
return _schema_err!(SchemaError::AmbiguousReference {
245-
field: Column::new(Some(qualifier.clone()), name)
246-
});
247-
}
248-
}
249-
Ok(())
250-
}
251-
252218
/// Assigns functional dependencies.
253219
pub fn with_functional_dependencies(
254220
mut self,
@@ -285,7 +251,6 @@ impl DFSchema {
285251
field_qualifiers: new_qualifiers,
286252
functional_dependencies: FunctionalDependencies::empty(),
287253
};
288-
new_self.check_names()?;
289254
Ok(new_self)
290255
}
291256

@@ -1141,10 +1106,10 @@ mod tests {
11411106
fn join_qualified_duplicate() -> Result<()> {
11421107
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
11431108
let right = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
1144-
let join = left.join(&right);
1109+
let join = left.join(&right)?;
11451110
assert_eq!(
1146-
join.unwrap_err().strip_backtrace(),
1147-
"Schema error: Schema contains duplicate qualified field name t1.c0",
1111+
"fields:[t1.c0, t1.c1, t1.c0, t1.c1], metadata:{}",
1112+
join.to_string()
11481113
);
11491114
Ok(())
11501115
}
@@ -1153,11 +1118,8 @@ mod tests {
11531118
fn join_unqualified_duplicate() -> Result<()> {
11541119
let left = DFSchema::try_from(test_schema_1())?;
11551120
let right = DFSchema::try_from(test_schema_1())?;
1156-
let join = left.join(&right);
1157-
assert_eq!(
1158-
join.unwrap_err().strip_backtrace(),
1159-
"Schema error: Schema contains duplicate unqualified field name c0"
1160-
);
1121+
let join = left.join(&right)?;
1122+
assert_eq!("fields:[c0, c1, c0, c1], metadata:{}", join.to_string());
11611123
Ok(())
11621124
}
11631125

@@ -1190,10 +1152,11 @@ mod tests {
11901152
fn join_mixed_duplicate() -> Result<()> {
11911153
let left = DFSchema::try_from_qualified_schema("t1", &test_schema_1())?;
11921154
let right = DFSchema::try_from(test_schema_1())?;
1193-
let join = left.join(&right);
1194-
assert_contains!(join.unwrap_err().to_string(),
1195-
"Schema error: Schema contains qualified \
1196-
field name t1.c0 and unqualified field name c0 which would be ambiguous");
1155+
let join = left.join(&right)?;
1156+
assert_eq!(
1157+
"fields:[t1.c0, t1.c1, c0, c1], metadata:{}",
1158+
join.to_string()
1159+
);
11971160
Ok(())
11981161
}
11991162

datafusion/common/src/error.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,11 @@ pub enum SchemaError {
150150
qualifier: Box<TableReference>,
151151
name: String,
152152
},
153+
/// Schema duplicate qualified fields with duplicate unqualified names
154+
QualifiedFieldWithDuplicateName {
155+
qualifier: Box<TableReference>,
156+
name: String,
157+
},
153158
/// Schema contains duplicate unqualified field name
154159
DuplicateUnqualifiedField { name: String },
155160
/// No field with this name
@@ -188,6 +193,14 @@ impl Display for SchemaError {
188193
quote_identifier(name)
189194
)
190195
}
196+
Self::QualifiedFieldWithDuplicateName { qualifier, name } => {
197+
write!(
198+
f,
199+
"Schema contains qualified fields with duplicate unqualified names {}.{}",
200+
qualifier.to_quoted_string(),
201+
quote_identifier(name)
202+
)
203+
}
191204
Self::DuplicateUnqualifiedField { name } => {
192205
write!(
193206
f,

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,8 +1557,6 @@ pub fn project(
15571557
_ => projected_expr.push(columnize_expr(normalize_col(e, &plan)?, &plan)?),
15581558
}
15591559
}
1560-
validate_unique_names("Projections", projected_expr.iter())?;
1561-
15621560
Projection::try_new(projected_expr, Arc::new(plan)).map(LogicalPlan::Projection)
15631561
}
15641562

@@ -1966,7 +1964,7 @@ mod tests {
19661964
use crate::logical_plan::StringifiedPlan;
19671965
use crate::{col, expr, expr_fn::exists, in_subquery, lit, scalar_subquery};
19681966

1969-
use datafusion_common::{RecursionUnnestOption, SchemaError};
1967+
use datafusion_common::RecursionUnnestOption;
19701968

19711969
#[test]
19721970
fn plan_builder_simple() -> Result<()> {
@@ -2202,25 +2200,14 @@ mod tests {
22022200
Some(vec![0, 1]),
22032201
)?
22042202
// two columns with the same name => error
2205-
.project(vec![col("id"), col("first_name").alias("id")]);
2206-
2207-
match plan {
2208-
Err(DataFusionError::SchemaError(
2209-
SchemaError::AmbiguousReference {
2210-
field:
2211-
Column {
2212-
relation: Some(TableReference::Bare { table }),
2213-
name,
2214-
},
2215-
},
2216-
_,
2217-
)) => {
2218-
assert_eq!(*"employee_csv", *table);
2219-
assert_eq!("id", &name);
2220-
Ok(())
2221-
}
2222-
_ => plan_err!("Plan should have returned an DataFusionError::SchemaError"),
2223-
}
2203+
.project(vec![col("id"), col("first_name").alias("id")])?
2204+
.build()?;
2205+
2206+
let expected = "\
2207+
Projection: employee_csv.id, employee_csv.first_name AS id\
2208+
\n TableScan: employee_csv projection=[id, first_name]";
2209+
assert_eq!(expected, format!("{plan}"));
2210+
Ok(())
22242211
}
22252212

22262213
fn employee_schema() -> Schema {

datafusion/expr/src/logical_plan/ddl.rs

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
use crate::{Expr, LogicalPlan, SortExpr, Volatility};
1919
use std::cmp::Ordering;
20-
use std::collections::HashMap;
20+
use std::collections::{BTreeSet, HashMap, HashSet};
2121
use std::sync::Arc;
2222
use std::{
2323
fmt::{self, Display},
@@ -28,7 +28,8 @@ use crate::expr::Sort;
2828
use arrow::datatypes::DataType;
2929
use datafusion_common::tree_node::{Transformed, TreeNodeContainer, TreeNodeRecursion};
3030
use datafusion_common::{
31-
Constraints, DFSchemaRef, Result, SchemaReference, TableReference,
31+
schema_err, Column, Constraints, DFSchema, DFSchemaRef, DataFusionError, Result,
32+
SchemaError, SchemaReference, TableReference,
3233
};
3334
use sqlparser::ast::Ident;
3435

@@ -306,6 +307,7 @@ impl CreateExternalTable {
306307
constraints,
307308
column_defaults,
308309
} = fields;
310+
check_fields_unique(&schema)?;
309311
Ok(Self {
310312
name,
311313
schema,
@@ -544,6 +546,7 @@ impl CreateMemoryTable {
544546
column_defaults,
545547
temporary,
546548
} = fields;
549+
check_fields_unique(input.schema())?;
547550
Ok(Self {
548551
name,
549552
constraints,
@@ -698,6 +701,7 @@ impl CreateView {
698701
definition,
699702
temporary,
700703
} = fields;
704+
check_fields_unique(input.schema())?;
701705
Ok(Self {
702706
name,
703707
input,
@@ -800,6 +804,48 @@ impl CreateViewBuilder {
800804
})
801805
}
802806
}
807+
fn check_fields_unique(schema: &DFSchema) -> Result<()> {
808+
// Use tree set for deterministic error messages
809+
let mut qualified_names = BTreeSet::new();
810+
let mut unqualified_names = HashSet::new();
811+
let mut name_occurrences: HashMap<&String, usize> = HashMap::new();
812+
813+
for (qualifier, field) in schema.iter() {
814+
if let Some(qualifier) = qualifier {
815+
// Check for duplicate qualified field names
816+
if !qualified_names.insert((qualifier, field.name())) {
817+
return schema_err!(SchemaError::DuplicateQualifiedField {
818+
qualifier: Box::new(qualifier.clone()),
819+
name: field.name().to_string(),
820+
});
821+
}
822+
// Check for duplicate unqualified field names
823+
} else if !unqualified_names.insert(field.name()) {
824+
return schema_err!(SchemaError::DuplicateUnqualifiedField {
825+
name: field.name().to_string()
826+
});
827+
}
828+
*name_occurrences.entry(field.name()).or_default() += 1;
829+
}
830+
831+
for (qualifier, name) in qualified_names {
832+
// Check for duplicate between qualified and unqualified field names
833+
if unqualified_names.contains(name) {
834+
return schema_err!(SchemaError::AmbiguousReference {
835+
field: Column::new(Some(qualifier.clone()), name)
836+
});
837+
}
838+
// Check for duplicates between qualified names as the qualification will be stripped off
839+
if name_occurrences[name] > 1 {
840+
return schema_err!(SchemaError::QualifiedFieldWithDuplicateName {
841+
qualifier: Box::new(qualifier.clone()),
842+
name: name.to_owned(),
843+
});
844+
}
845+
}
846+
847+
Ok(())
848+
}
803849

804850
/// Creates a catalog (aka "Database").
805851
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
@@ -1085,7 +1131,9 @@ impl PartialOrd for CreateIndex {
10851131

10861132
#[cfg(test)]
10871133
mod test {
1134+
use super::*;
10881135
use crate::{CreateCatalog, DdlStatement, DropView};
1136+
use arrow::datatypes::{DataType, Field, Schema};
10891137
use datafusion_common::{DFSchema, DFSchemaRef, TableReference};
10901138
use std::cmp::Ordering;
10911139

@@ -1112,4 +1160,85 @@ mod test {
11121160

11131161
assert_eq!(drop_view.partial_cmp(&catalog), Some(Ordering::Greater));
11141162
}
1163+
1164+
#[test]
1165+
fn test_check_fields_unique() -> Result<()> {
1166+
// no duplicate fields, unqualified schema
1167+
check_fields_unique(&DFSchema::try_from(Schema::new(vec![
1168+
Field::new("c100", DataType::Boolean, true),
1169+
Field::new("c101", DataType::Boolean, true),
1170+
]))?)?;
1171+
1172+
// no duplicate fields, qualified schema
1173+
check_fields_unique(&DFSchema::try_from_qualified_schema(
1174+
"t1",
1175+
&Schema::new(vec![
1176+
Field::new("c100", DataType::Boolean, true),
1177+
Field::new("c101", DataType::Boolean, true),
1178+
]),
1179+
)?)?;
1180+
1181+
// duplicate unqualified field with same qualifier
1182+
assert_eq!(
1183+
check_fields_unique(&DFSchema::try_from(Schema::new(vec![
1184+
Field::new("c0", DataType::Boolean, true),
1185+
Field::new("c1", DataType::Boolean, true),
1186+
Field::new("c1", DataType::Boolean, true),
1187+
Field::new("c2", DataType::Boolean, true),
1188+
]))?)
1189+
.unwrap_err()
1190+
.strip_backtrace()
1191+
.to_string(),
1192+
"Schema error: Schema contains duplicate unqualified field name c1"
1193+
);
1194+
1195+
// duplicate qualified field with same qualifier
1196+
assert_eq!(
1197+
check_fields_unique(&DFSchema::try_from_qualified_schema(
1198+
"t1",
1199+
&Schema::new(vec![
1200+
Field::new("c1", DataType::Boolean, true),
1201+
Field::new("c1", DataType::Boolean, true),
1202+
]),
1203+
)?)
1204+
.unwrap_err()
1205+
.strip_backtrace()
1206+
.to_string(),
1207+
"Schema error: Schema contains duplicate qualified field name t1.c1"
1208+
);
1209+
1210+
// duplicate qualified and unqualified field
1211+
assert_eq!(
1212+
check_fields_unique(&DFSchema::from_field_specific_qualified_schema(
1213+
vec![
1214+
None,
1215+
Some(TableReference::from("t1")),
1216+
],
1217+
&Arc::new(Schema::new(vec![
1218+
Field::new("c1", DataType::Boolean, true),
1219+
Field::new("c1", DataType::Boolean, true),
1220+
]))
1221+
)?)
1222+
.unwrap_err().strip_backtrace().to_string(),
1223+
"Schema error: Schema contains qualified field name t1.c1 and unqualified field name c1 which would be ambiguous"
1224+
);
1225+
1226+
// qualified fields with duplicate unqualified names
1227+
assert_eq!(
1228+
check_fields_unique(&DFSchema::from_field_specific_qualified_schema(
1229+
vec![
1230+
Some(TableReference::from("t1")),
1231+
Some(TableReference::from("t2")),
1232+
],
1233+
&Arc::new(Schema::new(vec![
1234+
Field::new("c1", DataType::Boolean, true),
1235+
Field::new("c1", DataType::Boolean, true),
1236+
]))
1237+
)?)
1238+
.unwrap_err().strip_backtrace().to_string(),
1239+
"Schema error: Schema contains qualified fields with duplicate unqualified names t1.c1"
1240+
);
1241+
1242+
Ok(())
1243+
}
11151244
}

0 commit comments

Comments
 (0)