diff --git a/spark/src/main/resources/error/delta-error-classes.json b/spark/src/main/resources/error/delta-error-classes.json index 2fc9ddf763..847cfbcd2b 100644 --- a/spark/src/main/resources/error/delta-error-classes.json +++ b/spark/src/main/resources/error/delta-error-classes.json @@ -2832,6 +2832,14 @@ ], "sqlState" : "0AKDC" }, + "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW": { + "message": [ + "This table can't be read by this version of Delta because an unsupported type change was applied. Field was changed from to .", + "Please upgrade to Delta 4.0 or higher to read this table, or drop the Type Widening table feature using a client that supports reading this table:", + " ALTER TABLE tableName DROP FEATURE " + ], + "sqlState": "0AKDC" + }, "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_SCHEMA" : { "message" : [ "Unable to operate on this table because an unsupported type change was applied. Field was changed from to ." diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 58d7105bae..80f8180538 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -667,6 +667,20 @@ trait DeltaErrorsBase ) } + def unsupportedTypeChangeInPreview( + fieldPath: Seq[String], + fromType: DataType, + toType: DataType, + feature: TypeWideningTableFeatureBase): Throwable = + new DeltaUnsupportedOperationException( + errorClass = "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW", + messageParameters = Array( + SchemaUtils.prettyFieldName(fieldPath), + fromType.sql, + toType.sql, + feature.name + )) + def unsupportedTypeChangeInSchema( fieldPath: Seq[String], fromType: DataType, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index eed170c4f8..90081c559e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -89,6 +89,14 @@ object TypeWidening { TypeWideningMetadata.getAllTypeChanges(metadata.schema).foreach { case (_, TypeChange(_, from: AtomicType, to: AtomicType, _)) if isTypeChangeSupported(from, to) => + case (fieldPath, TypeChange(_, from: AtomicType, to: AtomicType, _)) + if stableFeatureCanReadTypeChange(from, to) => + val featureName = if (protocol.isFeatureSupported(TypeWideningPreviewTableFeature)) { + TypeWideningPreviewTableFeature + } else { + TypeWideningTableFeature + } + throw DeltaErrors.unsupportedTypeChangeInPreview(fieldPath, from, to, featureName) case (fieldPath, invalidChange) => throw DeltaErrors.unsupportedTypeChangeInSchema( fieldPath ++ invalidChange.fieldPath, @@ -97,4 +105,25 @@ object TypeWidening { ) } } + + /** + * Whether the given type change is supported in the stable version of the feature. Used to + * provide a helpful error message during the preview phase if upgrading to Delta 4.0 would allow + * reading the table. + */ + private def stableFeatureCanReadTypeChange(fromType: AtomicType, toType: AtomicType): Boolean = + (fromType, toType) match { + case (from, to) if from == to => true + case (from: IntegralType, to: IntegralType) => from.defaultSize <= to.defaultSize + case (FloatType, DoubleType) => true + case (DateType, TimestampNTZType) => true + case (ByteType | ShortType | IntegerType, DoubleType) => true + case (from: DecimalType, to: DecimalType) => to.isWiderThan(from) + // Byte, Short, Integer are all stored as INT32 in parquet. The parquet readers support + // converting INT32 to Decimal(10, 0) and wider. + case (ByteType | ShortType | IntegerType, d: DecimalType) => d.isWiderThan(IntegerType) + // The parquet readers support converting INT64 to Decimal(20, 0) and wider. + case (LongType, d: DecimalType) => d.isWiderThan(LongType) + case _ => false + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index c94b4a884d..bdda79c1ed 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -1030,6 +1030,24 @@ trait DeltaErrorsSuiteBase |""".stripMargin )) } + { + checkError( + exception = intercept[DeltaUnsupportedOperationException] { + throw DeltaErrors.unsupportedTypeChangeInPreview( + fieldPath = Seq("origin", "country"), + fromType = IntegerType, + toType = LongType, + feature = TypeWideningPreviewTableFeature + ) + }, + "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW", + parameters = Map( + "fieldPath" -> "origin.country", + "fromType" -> "INT", + "toType" -> "BIGINT", + "typeWideningFeatureName" -> "typeWidening-preview" + )) + } { val e = intercept[DeltaIllegalStateException] { throw DeltaErrors.unsupportedTypeChangeInSchema(Seq("s", "a"), IntegerType, StringType) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala index 64805d2602..294dcd3733 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningTableFeatureSuite.scala @@ -45,7 +45,10 @@ class TypeWideningTableFeatureSuite with TypeWideningDropFeatureTestMixin with TypeWideningTableFeatureTests -trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideningTestCases { +trait TypeWideningTableFeatureTests + extends RowTrackingTestUtils + with DeltaExcludedBySparkVersionTestMixinShims + with TypeWideningTestCases { self: QueryTest with TypeWideningTestMixin with TypeWideningDropFeatureTestMixin => @@ -454,6 +457,43 @@ trait TypeWideningTableFeatureTests extends RowTrackingTestUtils with TypeWideni ) } + testSparkLatestOnly( + "helpful error when reading type changes not supported yet during preview") { + sql(s"CREATE TABLE delta.`$tempDir` (a int) USING DELTA") + val metadata = new MetadataBuilder() + .putMetadataArray("delta.typeChanges", Array( + new MetadataBuilder() + .putString("toType", "long") + .putString("fromType", "int") + .putLong("tableVersion", 1) + .build() + )).build() + + // Delta 3.2/3.3 doesn't support changing type from int->long, we manually commit that type + // change to simulate what Delta 4.0 could do. + deltaLog.withNewTransaction { txn => + txn.commit( + Seq(txn.snapshot.metadata.copy( + schemaString = new StructType() + .add("a", LongType, nullable = true, metadata).json + )), + ManualUpdate) + } + + checkError( + exception = intercept[DeltaUnsupportedOperationException] { + readDeltaTable(tempPath).collect() + }, + "DELTA_UNSUPPORTED_TYPE_CHANGE_IN_PREVIEW", + parameters = Map( + "fieldPath" -> "a", + "fromType" -> "INT", + "toType" -> "BIGINT", + "typeWideningFeatureName" -> "typeWidening-preview" + ) + ) + } + test("type widening rewrite metrics") { sql(s"CREATE TABLE delta.`$tempDir` (a byte) USING DELTA") addSingleFile(Seq(1, 2, 3), ByteType)