Skip to content

Commit 536f2b8

Browse files
Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
Add primary key information to CreateMemoryTable LogicalPlan node (#5835)
1 parent 99c4767 commit 536f2b8

File tree

5 files changed

+157
-60
lines changed

5 files changed

+157
-60
lines changed

datafusion/core/src/execution/context.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,15 @@ impl SessionContext {
344344
input,
345345
if_not_exists,
346346
or_replace,
347+
primary_key,
347348
}) => {
349+
if !primary_key.is_empty() {
350+
Err(DataFusionError::Execution(
351+
"Primary keys on MemoryTables are not currently supported!"
352+
.to_string(),
353+
))?;
354+
}
355+
348356
let input = Arc::try_unwrap(input).unwrap_or_else(|e| e.as_ref().clone());
349357
let table = self.table(&name).await;
350358

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1046,9 +1046,17 @@ impl LogicalPlan {
10461046
write!(f, "CreateExternalTable: {name:?}")
10471047
}
10481048
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
1049-
name, ..
1049+
name,
1050+
primary_key,
1051+
..
10501052
}) => {
1051-
write!(f, "CreateMemoryTable: {name:?}")
1053+
let pk: Vec<String> =
1054+
primary_key.iter().map(|c| c.name.to_string()).collect();
1055+
let mut pk = pk.join(", ");
1056+
if !pk.is_empty() {
1057+
pk = format!(" primary_key=[{pk}]");
1058+
}
1059+
write!(f, "CreateMemoryTable: {name:?}{pk}")
10521060
}
10531061
LogicalPlan::CreateView(CreateView { name, .. }) => {
10541062
write!(f, "CreateView: {name:?}")
@@ -1490,6 +1498,8 @@ pub struct Union {
14901498
pub struct CreateMemoryTable {
14911499
/// The table name
14921500
pub name: OwnedTableReference,
1501+
/// The ordered list of columns in the primary key, or an empty vector if none
1502+
pub primary_key: Vec<Column>,
14931503
/// The logical plan
14941504
pub input: Arc<LogicalPlan>,
14951505
/// Option to not error if table already exists

datafusion/expr/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -843,6 +843,7 @@ pub fn from_plan(
843843
..
844844
}) => Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
845845
input: Arc::new(inputs[0].clone()),
846+
primary_key: vec![],
846847
name: name.clone(),
847848
if_not_exists: *if_not_exists,
848849
or_replace: *or_replace,

datafusion/sql/src/statement.rs

Lines changed: 106 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ use datafusion_expr::{
4545
use sqlparser::ast;
4646
use sqlparser::ast::{
4747
Assignment, Expr as SQLExpr, Expr, Ident, ObjectName, ObjectType, OrderByExpr, Query,
48-
SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement, TableFactor,
49-
TableWithJoins, TransactionMode, UnaryOperator, Value,
48+
SchemaName, SetExpr, ShowCreateObject, ShowStatementFilter, Statement,
49+
TableConstraint, TableFactor, TableWithJoins, TransactionMode, UnaryOperator, Value,
5050
};
5151

5252
use sqlparser::parser::ParserError::ParserError;
@@ -128,69 +128,67 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
128128
if_not_exists,
129129
or_replace,
130130
..
131-
} if constraints.is_empty()
132-
&& table_properties.is_empty()
133-
&& with_options.is_empty() =>
134-
{
135-
match query {
136-
Some(query) => {
137-
let plan = self.query_to_plan(*query, planner_context)?;
138-
let input_schema = plan.schema();
139-
140-
let plan = if !columns.is_empty() {
141-
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
142-
if schema.fields().len() != input_schema.fields().len() {
143-
return Err(DataFusionError::Plan(format!(
131+
} if table_properties.is_empty() && with_options.is_empty() => match query {
132+
Some(query) => {
133+
let primary_key = Self::primary_key_from_constraints(&constraints)?;
134+
135+
let plan = self.query_to_plan(*query, planner_context)?;
136+
let input_schema = plan.schema();
137+
138+
let plan = if !columns.is_empty() {
139+
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
140+
if schema.fields().len() != input_schema.fields().len() {
141+
return Err(DataFusionError::Plan(format!(
144142
"Mismatch: {} columns specified, but result has {} columns",
145143
schema.fields().len(),
146144
input_schema.fields().len()
147145
)));
148-
}
149-
let input_fields = input_schema.fields();
150-
let project_exprs = schema
151-
.fields()
152-
.iter()
153-
.zip(input_fields)
154-
.map(|(field, input_field)| {
155-
cast(
156-
col(input_field.name()),
157-
field.data_type().clone(),
158-
)
146+
}
147+
let input_fields = input_schema.fields();
148+
let project_exprs = schema
149+
.fields()
150+
.iter()
151+
.zip(input_fields)
152+
.map(|(field, input_field)| {
153+
cast(col(input_field.name()), field.data_type().clone())
159154
.alias(field.name())
160-
})
161-
.collect::<Vec<_>>();
162-
LogicalPlanBuilder::from(plan.clone())
163-
.project(project_exprs)?
164-
.build()?
165-
} else {
166-
plan
167-
};
168-
169-
Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
170-
name: self.object_name_to_table_reference(name)?,
171-
input: Arc::new(plan),
172-
if_not_exists,
173-
or_replace,
174-
}))
175-
}
155+
})
156+
.collect::<Vec<_>>();
157+
LogicalPlanBuilder::from(plan.clone())
158+
.project(project_exprs)?
159+
.build()?
160+
} else {
161+
plan
162+
};
163+
164+
Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
165+
name: self.object_name_to_table_reference(name)?,
166+
primary_key,
167+
input: Arc::new(plan),
168+
if_not_exists,
169+
or_replace,
170+
}))
171+
}
176172

177-
None => {
178-
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
179-
let plan = EmptyRelation {
180-
produce_one_row: false,
181-
schema,
182-
};
183-
let plan = LogicalPlan::EmptyRelation(plan);
184-
185-
Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
186-
name: self.object_name_to_table_reference(name)?,
187-
input: Arc::new(plan),
188-
if_not_exists,
189-
or_replace,
190-
}))
191-
}
173+
None => {
174+
let primary_key = Self::primary_key_from_constraints(&constraints)?;
175+
176+
let schema = self.build_schema(columns)?.to_dfschema_ref()?;
177+
let plan = EmptyRelation {
178+
produce_one_row: false,
179+
schema,
180+
};
181+
let plan = LogicalPlan::EmptyRelation(plan);
182+
183+
Ok(LogicalPlan::CreateMemoryTable(CreateMemoryTable {
184+
name: self.object_name_to_table_reference(name)?,
185+
primary_key,
186+
input: Arc::new(plan),
187+
if_not_exists,
188+
or_replace,
189+
}))
192190
}
193-
}
191+
},
194192

195193
Statement::CreateView {
196194
or_replace,
@@ -1076,4 +1074,54 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
10761074
.get_table_provider(tables_reference)
10771075
.is_ok()
10781076
}
1077+
1078+
fn primary_key_from_constraints(
1079+
constraints: &[TableConstraint],
1080+
) -> Result<Vec<Column>> {
1081+
let pk: Result<Vec<&Vec<Ident>>> = constraints
1082+
.iter()
1083+
.map(|c: &TableConstraint| match c {
1084+
TableConstraint::Unique {
1085+
columns,
1086+
is_primary,
1087+
..
1088+
} => match is_primary {
1089+
true => Ok(columns),
1090+
false => Err(DataFusionError::Plan(
1091+
"Non-primary unique constraints are not supported".to_string(),
1092+
)),
1093+
},
1094+
TableConstraint::ForeignKey { .. } => Err(DataFusionError::Plan(
1095+
"Foreign key constraints are not currently supported".to_string(),
1096+
)),
1097+
TableConstraint::Check { .. } => Err(DataFusionError::Plan(
1098+
"Check constraints are not currently supported".to_string(),
1099+
)),
1100+
TableConstraint::Index { .. } => Err(DataFusionError::Plan(
1101+
"Indexes are not currently supported".to_string(),
1102+
)),
1103+
TableConstraint::FulltextOrSpatial { .. } => Err(DataFusionError::Plan(
1104+
"Indexes are not currently supported".to_string(),
1105+
)),
1106+
})
1107+
.collect();
1108+
let pk = pk?;
1109+
let pk = match pk.as_slice() {
1110+
[] => return Ok(vec![]),
1111+
[pk] => pk,
1112+
_ => {
1113+
return Err(DataFusionError::Plan(
1114+
"Only one primary key is supported!".to_string(),
1115+
))?
1116+
}
1117+
};
1118+
let primary_key: Vec<Column> = pk
1119+
.iter()
1120+
.map(|c| Column {
1121+
relation: None,
1122+
name: c.value.clone(),
1123+
})
1124+
.collect();
1125+
Ok(primary_key)
1126+
}
10791127
}

datafusion/sql/tests/integration_test.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,36 @@ fn cast_to_invalid_decimal_type() {
199199
}
200200
}
201201

202+
#[test]
203+
fn plan_create_table_with_pk() {
204+
let sql = "create table person (id int, name string, primary key(id))";
205+
let plan = r#"
206+
CreateMemoryTable: Bare { table: "person" } primary_key=[id]
207+
EmptyRelation
208+
"#
209+
.trim();
210+
quick_test(sql, plan);
211+
}
212+
213+
#[test]
214+
fn plan_create_table_no_pk() {
215+
let sql = "create table person (id int, name string)";
216+
let plan = r#"
217+
CreateMemoryTable: Bare { table: "person" }
218+
EmptyRelation
219+
"#
220+
.trim();
221+
quick_test(sql, plan);
222+
}
223+
224+
#[test]
225+
#[should_panic(expected = "Non-primary unique constraints are not supported")]
226+
fn plan_create_table_check_constraint() {
227+
let sql = "create table person (id int, name string, unique(id))";
228+
let plan = "";
229+
quick_test(sql, plan);
230+
}
231+
202232
#[test]
203233
fn plan_start_transaction() {
204234
let sql = "start transaction";

0 commit comments

Comments
 (0)