Skip to content

Commit

Permalink
Nicer error when failing to read table due to type change not support…
Browse files Browse the repository at this point in the history
…ed by the preview of type widening
  • Loading branch information
johanl-db committed Sep 26, 2024
1 parent 6871379 commit 9d250cc
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 1 deletion.
8 changes: 8 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2832,6 +2832,14 @@
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW": {
"message": [
"This table can't be read by this version of Delta because an unsupported type change was applied. Field <fieldPath> was changed from <fromType> to <toType>.",
"Please upgrade to Delta 4.0 or higher to read this table, or drop the Type Widening table feature using a client that supports reading this table:",
" ALTER TABLE tableName DROP FEATURE <typeWideningFeatureName>"
],
"sqlState": "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA" : {
"message" : [
"Unable to operate on this table because an unsupported type change was applied. Field <fieldName> was changed from <fromType> to <toType>."
Expand Down
14 changes: 14 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,20 @@ trait DeltaErrorsBase
)
}

def unsupportedTypeChangeInPreview(
fieldPath: Seq[String],
fromType: DataType,
toType: DataType,
feature: TypeWideningTableFeatureBase): Throwable =
new DeltaUnsupportedOperationException(
errorClass = "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW",
messageParameters = Array(
SchemaUtils.prettyFieldName(fieldPath),
fromType.sql,
toType.sql,
feature.name
))

def unsupportedTypeChangeInSchema(
fieldPath: Seq[String],
fromType: DataType,
Expand Down
29 changes: 29 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ object TypeWidening {
TypeWideningMetadata.getAllTypeChanges(metadata.schema).foreach {
case (_, TypeChange(_, from: AtomicType, to: AtomicType, _))
if isTypeChangeSupported(from, to) =>
case (fieldPath, TypeChange(_, from: AtomicType, to: AtomicType, _))
if stableFeatureCanReadTypeChange(from, to) =>
val featureName = if (protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) {
TypeWideningPreviewTableFeature
} else {
TypeWideningTableFeature
}
throw DeltaErrors.unsupportedTypeChangeInPreview(fieldPath, from, to, featureName)
case (fieldPath, invalidChange) =>
throw DeltaErrors.unsupportedTypeChangeInSchema(
fieldPath ++ invalidChange.fieldPath,
Expand All @@ -97,4 +105,25 @@ object TypeWidening {
)
}
}

/**
* Whether the given type change is supported in the stable version of the feature. Used to
* provide a helpful error message during the preview phase if upgrading to Delta 4.0 would allow
* reading the table.
*/
private def stableFeatureCanReadTypeChange(fromType: AtomicType, toType: AtomicType): Boolean =
(fromType, toType) match {
case (from, to) if from == to => true
case (from: IntegralType, to: IntegralType) => from.defaultSize <= to.defaultSize
case (FloatType, DoubleType) => true
case (DateType, TimestampNTZType) => true
case (ByteType | ShortType | IntegerType, DoubleType) => true
case (from: DecimalType, to: DecimalType) => to.isWiderThan(from)
// Byte, Short, Integer are all stored as INT32 in parquet. The parquet readers support
// converting INT32 to Decimal(10, 0) and wider.
case (ByteType | ShortType | IntegerType, d: DecimalType) => d.isWiderThan(IntegerType)
// The parquet readers support converting INT64 to Decimal(20, 0) and wider.
case (LongType, d: DecimalType) => d.isWiderThan(LongType)
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1030,6 +1030,24 @@ trait DeltaErrorsSuiteBase
|""".stripMargin
))
}
{
checkError(
exception = intercept[DeltaUnsupportedOperationException] {
throw DeltaErrors.unsupportedTypeChangeInPreview(
fieldPath = Seq("origin", "country"),
fromType = IntegerType,
toType = LongType,
feature = TypeWideningPreviewTableFeature
)
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW",
parameters = Map(
"fieldPath" -> "origin.country",
"fromType" -> "INT",
"toType" -> "BIGINT",
"typeWideningFeatureName" -> "typeWidening-preview"
))
}
{
val e = intercept[DeltaIllegalStateException] {
throw DeltaErrors.unsupportedTypeChangeInSchema(Seq("s", "a"), IntegerType, StringType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ class TypeWideningTableFeatureSuite
with TypeWideningDropFeatureTestMixin
with TypeWideningTableFeatureTests

trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideningTestCases {
trait TypeWideningTableFeatureTests
extends RowTrackingTestUtils
with DeltaExcludedBySparkVersionTestMixinShims
with TypeWideningTestCases {
self: QueryTest
with TypeWideningTestMixin
with TypeWideningDropFeatureTestMixin =>
Expand Down Expand Up @@ -454,6 +457,43 @@ trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideni
)
}

testSparkLatestOnly(
"helpful error when reading type changes not supported yet during preview") {
sql(s"CREATE TABLE delta.`$tempDir` (a int) USING DELTA")
val metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "long")
.putString("fromType", "int")
.putLong("tableVersion", 1)
.build()
)).build()

// Delta 3.2/3.3 doesn't support changing type from int->long, we manually commit that type
// change to simulate what Delta 4.0 could do.
deltaLog.withNewTransaction { txn =>
txn.commit(
Seq(txn.snapshot.metadata.copy(
schemaString = new StructType()
.add("a", LongType, nullable = true, metadata).json
)),
ManualUpdate)
}

checkError(
exception = intercept[DeltaUnsupportedOperationException] {
readDeltaTable(tempPath).collect()
},
"DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW",
parameters = Map(
"fieldPath" -> "a",
"fromType" -> "INT",
"toType" -> "BIGINT",
"typeWideningFeatureName" -> "typeWidening-preview"
)
)
}

test("type widening rewrite metrics") {
sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA")
addSingleFile(Seq(1, 2, 3), ByteType)
Expand Down

0 comments on commit 9d250cc

Please sign in to comment.