Skip to content

Commit 507b745

Browse files
rdbluecloud-fan
authored andcommitted
[SPARK-28139][SQL] Add v2 ALTER TABLE implementation.
## What changes were proposed in this pull request? Implement `ALTER TABLE` for v2 tables: * Add `AlterTable` logical plan and `AlterTableExec` physical plan * Convert `ALTER TABLE` parsed plans to `AlterTable` when a v2 catalog is responsible for an identifier * Validate that columns to alter exist in analyzer checks * Fix nested type handling in `CatalogV2Util` ## How was this patch tested? * Add extensive tests in `DataSourceV2SQLSuite` Closes apache#24937 from rdblue/SPARK-28139-add-v2-alter-table. Lead-authored-by: Ryan Blue <[email protected]> Co-authored-by: Ryan Blue <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 9eca58e commit 507b745

File tree

11 files changed

+1158
-33
lines changed

11 files changed

+1158
-33
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -227,14 +227,18 @@ public String property() {
227227
}
228228
}
229229

230+
interface ColumnChange extends TableChange {
231+
String[] fieldNames();
232+
}
233+
230234
/**
231235
* A TableChange to add a field.
232236
* <p>
233237
* If the field already exists, the change must result in an {@link IllegalArgumentException}.
234238
* If the new field is nested and its parent does not exist or is not a struct, the change must
235239
* result in an {@link IllegalArgumentException}.
236240
*/
237-
final class AddColumn implements TableChange {
241+
final class AddColumn implements ColumnChange {
238242
private final String[] fieldNames;
239243
private final DataType dataType;
240244
private final boolean isNullable;
@@ -247,6 +251,7 @@ private AddColumn(String[] fieldNames, DataType dataType, boolean isNullable, St
247251
this.comment = comment;
248252
}
249253

254+
@Override
250255
public String[] fieldNames() {
251256
return fieldNames;
252257
}
@@ -272,7 +277,7 @@ public String comment() {
272277
* <p>
273278
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
274279
*/
275-
final class RenameColumn implements TableChange {
280+
final class RenameColumn implements ColumnChange {
276281
private final String[] fieldNames;
277282
private final String newName;
278283

@@ -281,6 +286,7 @@ private RenameColumn(String[] fieldNames, String newName) {
281286
this.newName = newName;
282287
}
283288

289+
@Override
284290
public String[] fieldNames() {
285291
return fieldNames;
286292
}
@@ -297,7 +303,7 @@ public String newName() {
297303
* <p>
298304
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
299305
*/
300-
final class UpdateColumnType implements TableChange {
306+
final class UpdateColumnType implements ColumnChange {
301307
private final String[] fieldNames;
302308
private final DataType newDataType;
303309
private final boolean isNullable;
@@ -308,6 +314,7 @@ private UpdateColumnType(String[] fieldNames, DataType newDataType, boolean isNu
308314
this.isNullable = isNullable;
309315
}
310316

317+
@Override
311318
public String[] fieldNames() {
312319
return fieldNames;
313320
}
@@ -328,7 +335,7 @@ public boolean isNullable() {
328335
* <p>
329336
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
330337
*/
331-
final class UpdateColumnComment implements TableChange {
338+
final class UpdateColumnComment implements ColumnChange {
332339
private final String[] fieldNames;
333340
private final String newComment;
334341

@@ -337,6 +344,7 @@ private UpdateColumnComment(String[] fieldNames, String newComment) {
337344
this.newComment = newComment;
338345
}
339346

347+
@Override
340348
public String[] fieldNames() {
341349
return fieldNames;
342350
}
@@ -351,13 +359,14 @@ public String newComment() {
351359
* <p>
352360
* If the field does not exist, the change must result in an {@link IllegalArgumentException}.
353361
*/
354-
final class DeleteColumn implements TableChange {
362+
final class DeleteColumn implements ColumnChange {
355363
private final String[] fieldNames;
356364

357365
private DeleteColumn(String[] fieldNames) {
358366
this.fieldNames = fieldNames;
359367
}
360368

369+
@Override
361370
public String[] fieldNames() {
362371
return fieldNames;
363372
}

sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/utils/CatalogV2Util.scala

+40-11
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier, TableChange}
2626
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RemoveProperty, RenameColumn, SetProperty, UpdateColumnComment, UpdateColumnType}
2727
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
2828
import org.apache.spark.sql.sources.v2.Table
29-
import org.apache.spark.sql.types.{StructField, StructType}
29+
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
3030

3131
object CatalogV2Util {
3232
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
@@ -132,16 +132,45 @@ object CatalogV2Util {
132132
val pos = struct.getFieldIndex(fieldNames.head)
133133
.getOrElse(throw new IllegalArgumentException(s"Cannot find field: ${fieldNames.head}"))
134134
val field = struct.fields(pos)
135-
val replacement: Option[StructField] = if (fieldNames.tail.isEmpty) {
136-
update(field)
137-
} else {
138-
field.dataType match {
139-
case nestedStruct: StructType =>
140-
val updatedType: StructType = replace(nestedStruct, fieldNames.tail, update)
141-
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
142-
case _ =>
143-
throw new IllegalArgumentException(s"Not a struct: ${fieldNames.head}")
144-
}
135+
val replacement: Option[StructField] = (fieldNames.tail, field.dataType) match {
136+
case (Seq(), _) =>
137+
update(field)
138+
139+
case (names, struct: StructType) =>
140+
val updatedType: StructType = replace(struct, names, update)
141+
Some(StructField(field.name, updatedType, field.nullable, field.metadata))
142+
143+
case (Seq("key"), map @ MapType(keyType, _, _)) =>
144+
val updated = update(StructField("key", keyType, nullable = false))
145+
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map key"))
146+
Some(field.copy(dataType = map.copy(keyType = updated.dataType)))
147+
148+
case (Seq("key", names @ _*), map @ MapType(keyStruct: StructType, _, _)) =>
149+
Some(field.copy(dataType = map.copy(keyType = replace(keyStruct, names, update))))
150+
151+
case (Seq("value"), map @ MapType(_, mapValueType, isNullable)) =>
152+
val updated = update(StructField("value", mapValueType, nullable = isNullable))
153+
.getOrElse(throw new IllegalArgumentException(s"Cannot delete map value"))
154+
Some(field.copy(dataType = map.copy(
155+
valueType = updated.dataType,
156+
valueContainsNull = updated.nullable)))
157+
158+
case (Seq("value", names @ _*), map @ MapType(_, valueStruct: StructType, _)) =>
159+
Some(field.copy(dataType = map.copy(valueType = replace(valueStruct, names, update))))
160+
161+
case (Seq("element"), array @ ArrayType(elementType, isNullable)) =>
162+
val updated = update(StructField("element", elementType, nullable = isNullable))
163+
.getOrElse(throw new IllegalArgumentException(s"Cannot delete array element"))
164+
Some(field.copy(dataType = array.copy(
165+
elementType = updated.dataType,
166+
containsNull = updated.nullable)))
167+
168+
case (Seq("element", names @ _*), array @ ArrayType(elementStruct: StructType, _)) =>
169+
Some(field.copy(dataType = array.copy(elementType = replace(elementStruct, names, update))))
170+
171+
case (names, dataType) =>
172+
throw new IllegalArgumentException(
173+
s"Cannot find field: ${names.head} in ${dataType.simpleString}")
145174
}
146175

147176
val newFields = struct.fields.zipWithIndex.flatMap {

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

+83-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer
2424
import scala.util.Random
2525

2626
import org.apache.spark.sql.AnalysisException
27-
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog}
27+
import org.apache.spark.sql.catalog.v2.{CatalogNotFoundException, CatalogPlugin, LookupCatalog, TableChange}
2828
import org.apache.spark.sql.catalyst._
2929
import org.apache.spark.sql.catalyst.catalog._
3030
import org.apache.spark.sql.catalyst.encoders.OuterScopes
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._
3434
import org.apache.spark.sql.catalyst.expressions.objects._
3535
import org.apache.spark.sql.catalyst.plans._
3636
import org.apache.spark.sql.catalyst.plans.logical._
37+
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement}
3738
import org.apache.spark.sql.catalyst.rules._
3839
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
3940
import org.apache.spark.sql.catalyst.util.toPrettySQL
@@ -165,6 +166,7 @@ class Analyzer(
165166
new SubstituteUnresolvedOrdinals(conf)),
166167
Batch("Resolution", fixedPoint,
167168
ResolveTableValuedFunctions ::
169+
ResolveAlterTable ::
168170
ResolveTables ::
169171
ResolveRelations ::
170172
ResolveReferences ::
@@ -787,6 +789,86 @@ class Analyzer(
787789
}
788790
}
789791

792+
/**
793+
* Resolve ALTER TABLE statements that use a DSv2 catalog.
794+
*
795+
* This rule converts unresolved ALTER TABLE statements to v2 when a v2 catalog is responsible
796+
* for the table identifier. A v2 catalog is responsible for an identifier when the identifier
797+
* has a catalog specified, like prod_catalog.db.table, or when a default v2 catalog is set and
798+
* the table identifier does not include a catalog.
799+
*/
800+
object ResolveAlterTable extends Rule[LogicalPlan] {
801+
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
802+
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
803+
case alter @ AlterTableAddColumnsStatement(
804+
CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
805+
val changes = cols.map { col =>
806+
TableChange.addColumn(col.name.toArray, col.dataType, true, col.comment.orNull)
807+
}
808+
809+
AlterTable(
810+
v2Catalog.asTableCatalog, ident,
811+
UnresolvedRelation(alter.tableName),
812+
changes)
813+
814+
case alter @ AlterTableAlterColumnStatement(
815+
CatalogObjectIdentifier(Some(v2Catalog), ident), colName, dataType, comment) =>
816+
val typeChange = dataType.map { newDataType =>
817+
TableChange.updateColumnType(colName.toArray, newDataType, true)
818+
}
819+
820+
val commentChange = comment.map { newComment =>
821+
TableChange.updateColumnComment(colName.toArray, newComment)
822+
}
823+
824+
AlterTable(
825+
v2Catalog.asTableCatalog, ident,
826+
UnresolvedRelation(alter.tableName),
827+
typeChange.toSeq ++ commentChange.toSeq)
828+
829+
case alter @ AlterTableRenameColumnStatement(
830+
CatalogObjectIdentifier(Some(v2Catalog), ident), col, newName) =>
831+
AlterTable(
832+
v2Catalog.asTableCatalog, ident,
833+
UnresolvedRelation(alter.tableName),
834+
Seq(TableChange.renameColumn(col.toArray, newName)))
835+
836+
case alter @ AlterTableDropColumnsStatement(
837+
CatalogObjectIdentifier(Some(v2Catalog), ident), cols) =>
838+
val changes = cols.map(col => TableChange.deleteColumn(col.toArray))
839+
AlterTable(
840+
v2Catalog.asTableCatalog, ident,
841+
UnresolvedRelation(alter.tableName),
842+
changes)
843+
844+
case alter @ AlterTableSetPropertiesStatement(
845+
CatalogObjectIdentifier(Some(v2Catalog), ident), props) =>
846+
val changes = props.map {
847+
case (key, value) =>
848+
TableChange.setProperty(key, value)
849+
}
850+
851+
AlterTable(
852+
v2Catalog.asTableCatalog, ident,
853+
UnresolvedRelation(alter.tableName),
854+
changes.toSeq)
855+
856+
case alter @ AlterTableUnsetPropertiesStatement(
857+
CatalogObjectIdentifier(Some(v2Catalog), ident), keys, _) =>
858+
AlterTable(
859+
v2Catalog.asTableCatalog, ident,
860+
UnresolvedRelation(alter.tableName),
861+
keys.map(key => TableChange.removeProperty(key)))
862+
863+
case alter @ AlterTableSetLocationStatement(
864+
CatalogObjectIdentifier(Some(v2Catalog), ident), newLoc) =>
865+
AlterTable(
866+
v2Catalog.asTableCatalog, ident,
867+
UnresolvedRelation(alter.tableName),
868+
Seq(TableChange.setProperty("location", newLoc)))
869+
}
870+
}
871+
790872
/**
791873
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from
792874
* a logical plan node's children.

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala

+54
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
1919

2020
import org.apache.spark.api.python.PythonEvalType
2121
import org.apache.spark.sql.AnalysisException
22+
import org.apache.spark.sql.catalog.v2.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
2223
import org.apache.spark.sql.catalyst.expressions._
2324
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
2425
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
@@ -353,6 +354,59 @@ trait CheckAnalysis extends PredicateHelper {
353354
case _ =>
354355
}
355356

357+
case alter: AlterTable if alter.childrenResolved =>
358+
val table = alter.table
359+
def findField(operation: String, fieldName: Array[String]): StructField = {
360+
// include collections because structs nested in maps and arrays may be altered
361+
val field = table.schema.findNestedField(fieldName, includeCollections = true)
362+
if (field.isEmpty) {
363+
throw new AnalysisException(
364+
s"Cannot $operation missing field in ${table.name} schema: ${fieldName.quoted}")
365+
}
366+
field.get
367+
}
368+
369+
alter.changes.foreach {
370+
case add: AddColumn =>
371+
val parent = add.fieldNames.init
372+
if (parent.nonEmpty) {
373+
findField("add to", parent)
374+
}
375+
case update: UpdateColumnType =>
376+
val field = findField("update", update.fieldNames)
377+
val fieldName = update.fieldNames.quoted
378+
update.newDataType match {
379+
case _: StructType =>
380+
throw new AnalysisException(
381+
s"Cannot update ${table.name} field $fieldName type: " +
382+
s"update a struct by adding, deleting, or updating its fields")
383+
case _: MapType =>
384+
throw new AnalysisException(
385+
s"Cannot update ${table.name} field $fieldName type: " +
386+
s"update a map by updating $fieldName.key or $fieldName.value")
387+
case _: ArrayType =>
388+
throw new AnalysisException(
389+
s"Cannot update ${table.name} field $fieldName type: " +
390+
s"update the element by updating $fieldName.element")
391+
case _: AtomicType =>
392+
// update is okay
393+
}
394+
if (!Cast.canUpCast(field.dataType, update.newDataType)) {
395+
throw new AnalysisException(
396+
s"Cannot update ${table.name} field $fieldName: " +
397+
s"${field.dataType.simpleString} cannot be cast to " +
398+
s"${update.newDataType.simpleString}")
399+
}
400+
case rename: RenameColumn =>
401+
findField("rename", rename.fieldNames)
402+
case update: UpdateColumnComment =>
403+
findField("update", update.fieldNames)
404+
case delete: DeleteColumn =>
405+
findField("delete", delete.fieldNames)
406+
case _ =>
407+
// no validation needed for set and remove property
408+
}
409+
356410
case _ => // Fallbacks to the following checks
357411
}
358412

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,15 @@ class UnresolvedException[TreeType <: TreeNode[_]](tree: TreeType, function: Str
4040
*
4141
* @param multipartIdentifier table name
4242
*/
43-
case class UnresolvedRelation(multipartIdentifier: Seq[String]) extends LeafNode {
43+
case class UnresolvedRelation(
44+
multipartIdentifier: Seq[String]) extends LeafNode with NamedRelation {
4445
import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
4546

4647
/** Returns a `.` separated name for this relation. */
4748
def tableName: String = multipartIdentifier.quoted
4849

50+
override def name: String = tableName
51+
4952
override def output: Seq[Attribute] = Nil
5053

5154
override lazy val resolved = false

0 commit comments

Comments
 (0)