diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala index a8d1def58c..4f441ec50d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/ResolveDeltaMergeInto.scala @@ -234,7 +234,7 @@ object ResolveDeltaMergeInto { // schema before merging it with the target schema. We don't consider NOT MATCHED BY SOURCE // clauses since these can't by definition reference source columns and thus can't introduce // new columns in the target schema. - val actions = (matchedClauses ++ notMatchedClauses).flatMap(_.actions) + val actions = (resolvedMatchedClauses ++ resolvedNotMatchedClauses).flatMap(_.actions) val assignments = actions.collect { case a: DeltaMergeAction => a.targetColNameParts } val containsStarAction = actions.exists { case _: UnresolvedStar => true @@ -278,6 +278,12 @@ object ResolveDeltaMergeInto { }) val migrationSchema = filterSchema(source.schema, Seq.empty) + val allowTypeWidening = target.exists { + case DeltaTable(fileIndex) => + TypeWidening.isEnabled(fileIndex.protocol, fileIndex.metadata) + case _ => false + } + // The implicit conversions flag allows any type to be merged from source to target if Spark // SQL considers the source type implicitly castable to the target. Normally, mergeSchemas // enforces Parquet-level write compatibility, which would mean an INT source can't be merged @@ -285,7 +291,9 @@ object ResolveDeltaMergeInto { SchemaMergingUtils.mergeSchemas( target.schema, migrationSchema, - allowImplicitConversions = true) + allowImplicitConversions = true, + allowTypeWidening = allowTypeWidening + ) } else { target.schema } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala index 2efbdf11e4..7a56323264 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/TypeWidening.scala @@ -47,6 +47,20 @@ object TypeWidening { isEnabled } + /** + * Checks that the type widening table property wasn't disabled or enabled between the two given + * states, throws an errors if it was. + */ + def ensureFeatureConsistentlyEnabled( + protocol: Protocol, + metadata: Metadata, + otherProtocol: Protocol, + otherMetadata: Metadata): Unit = { + if (isEnabled(protocol, metadata) != isEnabled(otherProtocol, otherMetadata)) { + throw DeltaErrors.metadataChangedException(None) + } + } + /** * Returns whether the given type change is eligible for widening. This only checks atomic types. * It is the responsibility of the caller to recurse into structs, maps and arrays. @@ -62,6 +76,19 @@ object TypeWidening { case _ => false } + /** + * Returns whether the given type change can be applied during schema evolution. Only a + * subset of supported type changes are considered for schema evolution. + */ + def isTypeChangeSupportedForSchemaEvolution(fromType: AtomicType, toType: AtomicType): Boolean = + (fromType, toType) match { + case (from, to) if from == to => true + case (from, to) if !isTypeChangeSupported(from, to) => false + case (ByteType, ShortType) => true + case (ByteType | ShortType, IntegerType) => true + case _ => false + } + /** * Filter the given list of files to only keep files that were written before the latest type * change, if any. These older files contain a column or field with a type that is different than diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala index d2b4b1b283..2723f8b013 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/MergeIntoCommand.scala @@ -91,6 +91,15 @@ case class MergeIntoCommand( atAnalysis = target.schema, latestSchema = deltaTxn.metadata.schema) } + // Check that type widening wasn't enabled/disabled between analysis and the start of the + // transaction. + TypeWidening.ensureFeatureConsistentlyEnabled( + protocol = targetFileIndex.protocol, + metadata = targetFileIndex.metadata, + otherProtocol = deltaTxn.protocol, + otherMetadata = deltaTxn.metadata + ) + if (canMergeSchema) { updateMetadata( spark, deltaTxn, migratedSchema.getOrElse(target.schema), diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala index 9b108f9715..20320440e1 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/ImplicitMetadataOperation.scala @@ -122,7 +122,14 @@ trait ImplicitMetadataOperation extends DeltaLogging { if (rearrangeOnly) { throw DeltaErrors.unexpectedDataChangeException("Change the Delta table schema") } - txn.updateMetadata(txn.metadata.copy(schemaString = mergedSchema.json + + val schemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata( + txn, + schema = mergedSchema, + oldSchema = txn.metadata.schema + ) + + txn.updateMetadata(txn.metadata.copy(schemaString = schemaWithTypeWideningMetadata.json )) } else if (isNewSchema || isNewPartitioning ) { @@ -201,7 +208,8 @@ object ImplicitMetadataOperation { SchemaMergingUtils.mergeSchemas( txn.metadata.schema, dataSchema, - fixedTypeColumns = fixedTypeColumns) + fixedTypeColumns = fixedTypeColumns, + allowTypeWidening = TypeWidening.isEnabled(txn.protocol, txn.metadata)) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala index e57f181e73..a34a351d45 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/schema/SchemaMergingUtils.scala @@ -20,14 +20,13 @@ import java.util.Locale import scala.util.control.NonFatal -import org.apache.spark.sql.delta.DeltaAnalysisException +import org.apache.spark.sql.delta.{DeltaAnalysisException, TypeWidening} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.DeltaMergeInto import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.types.{ArrayType, ByteType, DataType, DecimalType, IntegerType, MapType, NullType, ShortType, StructField, StructType} +import org.apache.spark.sql.types._ /** * Utils to merge table schema with data schema. @@ -168,6 +167,7 @@ object SchemaMergingUtils { dataSchema: StructType, allowImplicitConversions: Boolean = false, keepExistingType: Boolean = false, + allowTypeWidening: Boolean = false, fixedTypeColumns: Set[String] = Set.empty, caseSensitive: Boolean = false): StructType = { checkColumnNameDuplication(dataSchema, "in the data to save", caseSensitive) @@ -232,6 +232,9 @@ object SchemaMergingUtils { // Simply keeps the existing type for primitive types case (current, update) if keepExistingType => current + case (current: AtomicType, update: AtomicType) if allowTypeWidening && + TypeWidening.isTypeChangeSupportedForSchemaEvolution(current, update) => update + // If implicit conversions are allowed, that means we can use any valid implicit cast to // perform the merge. case (current, update) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index b7fbad3035..22b52bbf44 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -1432,7 +1432,12 @@ trait DeltaErrorsSuiteBase val e = intercept[DeltaAnalysisException] { val s1 = StructType(Seq(StructField("c0", IntegerType, true))) val s2 = StructType(Seq(StructField("c0", StringType, false))) - SchemaMergingUtils.mergeSchemas(s1, s2, false, false, Set("c0")) + SchemaMergingUtils.mergeSchemas(s1, s2, + allowImplicitConversions = false, + keepExistingType = false, + allowTypeWidening = false, + Set("c0") + ) } checkErrorMessage(e, Some("DELTA_GENERATED_COLUMNS_DATA_TYPE_MISMATCH"), Some("42K09"), Some("Column c0 is a generated column or a column used by a generated " + diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index 5c2c28e1bb..7d0f54ba47 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -42,10 +42,11 @@ import org.apache.spark.scheduler.{JobFailed, SparkListener, SparkListenerJobEnd import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.util.quietly +import org.apache.spark.sql.catalyst.util.{quietly, FailFastMode} import org.apache.spark.sql.execution.{FileSourceScanExec, QueryExecution, RDDScanExec, SparkPlan, WholeStageCodegenExec} import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.util.Utils @@ -475,6 +476,8 @@ trait DeltaDMLTestUtils with BeforeAndAfterEach { self: SharedSparkSession => + import testImplicits._ + protected var tempDir: File = _ protected var deltaLog: DeltaLog = _ @@ -523,6 +526,23 @@ trait DeltaDMLTestUtils } } + /** + * Parse the input JSON data into a dataframe, one row per input element. + * Throws an exception on malformed inputs or records that don't comply with the provided schema. + */ + protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = { + if (schema != null) { + spark.read + .schema(schema) + .option("mode", FailFastMode.name) + .json(data.toDS) + } else { + spark.read + .option("mode", FailFastMode.name) + .json(data.toDS) + } + } + protected def readDeltaTable(path: String): DataFrame = { spark.read.format("delta").load(path) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala new file mode 100644 index 0000000000..bf5317952c --- /dev/null +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSchemaEvolutionSuite.scala @@ -0,0 +1,359 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.SparkConf +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy +import org.apache.spark.sql.types._ + + +/** + * Suite covering widening columns and fields type as part of automatic schema evolution when the + * type widening table feature is supported. + */ +class DeltaTypeWideningSchemaEvolutionSuite + extends QueryTest + with DeltaDMLTestUtils + with DeltaSQLCommandTest + with DeltaTypeWideningTestMixin + with DeltaMergeIntoTypeWideningSchemaEvolutionTests { + + protected override def sparkConf: SparkConf = { + super.sparkConf + .set(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "true") + } +} + +/** + * Tests covering type widening during schema evolution in MERGE INTO. + */ +trait DeltaMergeIntoTypeWideningSchemaEvolutionTests + extends MergeIntoSQLTestUtils + with MergeIntoSchemaEvolutionMixin + with DeltaTypeWideningTestCases { + self: QueryTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => + + import testImplicits._ + + for { + testCase <- supportedTestCases + } { + test(s"MERGE - automatic type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}") { + withTable("source") { + testCase.additionalValuesDF.write.format("delta").saveAsTable("source") + append(testCase.initialValuesDF) + + // We mainly want to ensure type widening is correctly applied to the schema. We use a + // trivial insert only merge to make it easier to validate results. + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*")) + + assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) + checkAnswer( + readDeltaTable(tempPath).select("value").sort("value"), + testCase.expectedResult.select($"value".cast(testCase.toType)).sort("value")) + } + } + } + + for { + testCase <- unsupportedTestCases + } { + test(s"MERGE - unsupported automatic type widening " + + s"${testCase.fromType.sql} -> ${testCase.toType.sql}") { + withTable("source") { + testCase.additionalValuesDF.write.format("delta").saveAsTable("source") + append(testCase.initialValuesDF) + + // Test cases for some of the unsupported type changes may overflow while others only have + // values that can be implicitly cast to the narrower type - e.g. double ->float. + // We set storeAssignmentPolicy to LEGACY to ignore overflows, this test only ensures + // that the table schema didn't evolve. + withSQLConf(SQLConf.STORE_ASSIGNMENT_POLICY.key -> StoreAssignmentPolicy.LEGACY.toString) { + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*")) + assert(readDeltaTable(tempPath).schema("value").dataType === testCase.fromType) + } + } + } + } + + test("MERGE - type widening isn't applied when it's disabled") { + withTable("source") { + sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA") + sql("CREATE TABLE source (a int) USING DELTA") + sql("INSERT INTO source VALUES (1), (2)") + enableTypeWidening(tempPath, enabled = false) + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + // Merge integer values. This should succeed and downcast the values to short. + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*") + ) + assert(readDeltaTable(tempPath).schema("a").dataType === ShortType) + checkAnswer(readDeltaTable(tempPath), + Seq(1, 2).toDF("a").select($"a".cast(ShortType))) + } + } + } + + test("MERGE - type widening isn't applied when schema evolution is disabled") { + withTable("source") { + sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA") + sql("CREATE TABLE source (a int) USING DELTA") + sql("INSERT INTO source VALUES (1), (2)") + + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "false") { + // Merge integer values. This should succeed and downcast the values to short. + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*") + ) + assert(readDeltaTable(tempPath).schema("a").dataType === ShortType) + checkAnswer(readDeltaTable(tempPath), + Seq(1, 2).toDF("a").select($"a".cast(ShortType))) + } + + // Check that we would actually widen if schema evolution was enabled. + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key -> "true") { + executeMerge( + tgt = s"delta.`$tempPath` t", + src = "source", + cond = "0 = 1", + clauses = insert("*") + ) + assert(readDeltaTable(tempPath).schema("a").dataType === IntegerType) + checkAnswer(readDeltaTable(tempPath), Seq(1, 2, 1, 2).toDF("a")) + } + } + } + + /** + * Wrapper around testNestedStructsEvolution that constrains the result with and without schema + * evolution to be the same: the schema is different but the values should be the same. + */ + protected def testTypeEvolution(name: String)( + target: Seq[String], + source: Seq[String], + targetSchema: StructType, + sourceSchema: StructType, + cond: String = "t.key = s.key", + clauses: Seq[MergeClause] = Seq.empty, + result: Seq[String], + resultSchema: StructType): Unit = + testNestedStructsEvolution(s"MERGE - $name")( + target, + source, + targetSchema, + sourceSchema, + cond, + clauses, + result, + resultWithoutEvolution = result, + resultSchema = resultSchema) + + + testTypeEvolution("change top-level column short -> int with update")( + target = Seq("""{ "a": 0 }""", """{ "a": 10 }"""), + source = Seq("""{ "a": 0 }""", """{ "a": 20 }"""), + targetSchema = new StructType().add("a", ShortType), + sourceSchema = new StructType().add("a", IntegerType), + cond = "t.a = s.a", + clauses = update("a = s.a + 1") :: Nil, + result = Seq("""{ "a": 1 }""", """{ "a": 10 }"""), + resultSchema = new StructType() + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) + ) + + testTypeEvolution("change top-level column short -> int with insert")( + target = Seq("""{ "a": 0 }""", """{ "a": 10 }"""), + source = Seq("""{ "a": 0 }""", """{ "a": 20 }"""), + targetSchema = new StructType().add("a", ShortType), + sourceSchema = new StructType().add("a", IntegerType), + cond = "t.a = s.a", + clauses = insert("(a) VALUES (s.a)") :: Nil, + result = Seq("""{ "a": 0 }""", """{ "a": 10 }""", """{ "a": 20 }"""), + resultSchema = new StructType() + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) + ) + + testTypeEvolution("updating using narrower value doesn't evolve schema")( + target = Seq("""{ "a": 0 }""", """{ "a": 10 }"""), + source = Seq("""{ "a": 0 }""", """{ "a": 20 }"""), + targetSchema = new StructType().add("a", IntegerType), + sourceSchema = new StructType().add("a", ShortType), + cond = "t.a = s.a", + clauses = update("a = s.a + 1") :: Nil, + result = Seq("""{ "a": 1 }""", """{ "a": 10 }"""), + resultSchema = new StructType().add("a", IntegerType) + ) + + testTypeEvolution("only columns in assignments are widened")( + target = Seq("""{ "a": 0, "b": 5 }""", """{ "a": 10, "b": 15 }"""), + source = Seq("""{ "a": 0, "b": 6 }""", """{ "a": 20, "b": 16 }"""), + targetSchema = new StructType() + .add("a", ShortType) + .add("b", ShortType), + sourceSchema = new StructType() + .add("a", IntegerType) + .add("b", IntegerType), + cond = "t.a = s.a", + clauses = update("a = s.a + 1") :: Nil, + result = Seq( + """{ "a": 1, "b": 5 }""", """{ "a": 10, "b": 15 }"""), + resultSchema = new StructType() + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) + .add("b", ShortType) + ) + + testTypeEvolution("automatic widening of struct field with struct assignment")( + target = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 10 } }"""), + source = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 20 } }"""), + targetSchema = new StructType() + .add("s", new StructType() + .add("a", ShortType)), + sourceSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType)), + cond = "t.s.a = s.s.a", + clauses = update("t.s.a = s.s.a + 1") :: Nil, + result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), + resultSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))) + ) + + testTypeEvolution("automatic widening of struct field with field assignment")( + target = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 10 } }"""), + source = Seq("""{ "s": { "a": 1 } }""", """{ "s": { "a": 20 } }"""), + targetSchema = new StructType() + .add("s", new StructType() + .add("a", ShortType)), + sourceSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType)), + cond = "t.s.a = s.s.a", + clauses = update("t.s.a = s.s.a + 1") :: Nil, + result = Seq("""{ "s": { "a": 2 } }""", """{ "s": { "a": 10 } }"""), + resultSchema = new StructType() + .add("s", new StructType() + .add("a", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))) + ) + + testTypeEvolution("automatic widening of map value")( + target = Seq("""{ "m": { "a": 1 } }"""), + source = Seq("""{ "m": { "a": 2 } }"""), + targetSchema = new StructType() + .add("m", MapType(StringType, ShortType)), + sourceSchema = new StructType() + .add("m", MapType(StringType, IntegerType)), + // Can't compare maps + cond = "1 = 1", + clauses = update("t.m = s.m") :: Nil, + result = Seq("""{ "m": { "a": 2 } }"""), + resultSchema = new StructType() + .add("m", + MapType(StringType, IntegerType), + nullable = true, + metadata = typeWideningMetadata( + version = 1, + from = ShortType, + to = IntegerType, + path = Seq("value"))) + ) + + testTypeEvolution("automatic widening of array element")( + target = Seq("""{ "a": [1, 2] }"""), + source = Seq("""{ "a": [3, 4] }"""), + targetSchema = new StructType() + .add("a", ArrayType(ShortType)), + sourceSchema = new StructType() + .add("a", ArrayType(IntegerType)), + cond = "t.a != s.a", + clauses = update("t.a = s.a") :: Nil, + result = Seq("""{ "a": [3, 4] }"""), + resultSchema = new StructType() + .add("a", + ArrayType(IntegerType), + nullable = true, + metadata = typeWideningMetadata( + version = 1, + from = ShortType, + to = IntegerType, + path = Seq("element"))) + ) + + testTypeEvolution("multiple automatic widening")( + target = Seq("""{ "a": 1, "b": 2 }"""), + source = Seq("""{ "a": 1, "b": 4 }""", """{ "a": 5, "b": 6 }"""), + targetSchema = new StructType() + .add("a", ByteType) + .add("b", ShortType), + sourceSchema = new StructType() + .add("a", ShortType) + .add("b", IntegerType), + cond = "t.a = s.a", + clauses = update("*") :: insert("*") :: Nil, + result = Seq("""{ "a": 1, "b": 4 }""", """{ "a": 5, "b": 6 }"""), + resultSchema = new StructType() + .add("a", ShortType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ByteType, to = ShortType)) + .add("b", IntegerType, nullable = true, + metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) + ) + + for (enabled <- BOOLEAN_DOMAIN) + test(s"MERGE - fail if type widening gets ${if (enabled) "enabled" else "disabled"} by a " + + "concurrent transaction") { + sql(s"CREATE TABLE delta.`$tempPath` (a short) USING DELTA") + enableTypeWidening(tempPath, !enabled) + val target = io.delta.tables.DeltaTable.forPath(tempPath) + import testImplicits._ + val merge = target.as("target") + .merge( + source = Seq(1L).toDF("a").as("source"), + condition = "target.a = source.a") + .whenNotMatched().insertAll() + + // The MERGE operation was created with type widening enabled, which will apply during analysis. + // Disable type widening so that the actual execution runs with type widening disabled. + enableTypeWidening(tempPath, enabled) + intercept[MetadataChangedException] { + merge.execute() + } + } +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala index 9fb52146ee..a586fc2e90 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTypeWideningSuite.scala @@ -67,15 +67,23 @@ trait DeltaTypeWideningTestMixin extends SharedSparkSession { protected def enableTypeWidening(tablePath: String, enabled: Boolean = true): Unit = sql(s"ALTER TABLE delta.`$tablePath` " + s"SET TBLPROPERTIES('${DeltaConfigs.ENABLE_TYPE_WIDENING.key}' = '${enabled.toString}')") + + /** Short-hand to create type widening metadata for struct fields. */ + protected def typeWideningMetadata( + version: Long, + from: AtomicType, + to: AtomicType, + path: Seq[String] = Seq.empty): Metadata = + new MetadataBuilder() + .putMetadataArray( + "delta.typeChanges", Array(TypeChange(version, from, to, path).toMetadata)) + .build() } /** - * Trait collecting a subset of tests providing core coverage for type widening using ALTER TABLE - * CHANGE COLUMN TYPE. + * Trait collecting supported and unsupported type change test cases. */ -trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { - self: QueryTest with ParquetTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => - +trait DeltaTypeWideningTestCases { self: SharedSparkSession => import testImplicits._ /** @@ -119,7 +127,7 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { // Type changes that are supported by all Parquet readers. Byte, Short, Int are all stored as // INT32 in parquet so these changes are guaranteed to be supported. - private val supportedTestCases: Seq[TypeEvolutionTestCase] = Seq( + protected val supportedTestCases: Seq[TypeEvolutionTestCase] = Seq( SupportedTypeEvolutionTestCase(ByteType, ShortType, Seq(1, -1, Byte.MinValue, Byte.MaxValue, null.asInstanceOf[Byte]), Seq(4, -4, Short.MinValue, Short.MaxValue, null.asInstanceOf[Short])), @@ -131,35 +139,6 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { Seq(4, -4, Int.MinValue, Int.MaxValue, null.asInstanceOf[Int])) ) - for { - testCase <- supportedTestCases - partitioned <- BOOLEAN_DOMAIN - } { - test(s"type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}, " + - s"partitioned=$partitioned") { - def writeData(df: DataFrame): Unit = if (partitioned) { - // The table needs to have at least 1 non-partition column, use a dummy one. - append(df.withColumn("dummy", lit(1)), partitionBy = Seq("value")) - } else { - append(df) - } - - writeData(testCase.initialValuesDF) - sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}") - withAllParquetReaders { - assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) - checkAnswer(readDeltaTable(tempPath).select("value").sort("value"), - testCase.initialValuesDF.select($"value".cast(testCase.toType)).sort("value")) - } - writeData(testCase.additionalValuesDF) - withAllParquetReaders { - checkAnswer( - readDeltaTable(tempPath).select("value").sort("value"), - testCase.expectedResult.sort("value")) - } - } - } - /** * Represents the input of an unsupported type change test. Handles converting the test values * from scala types to a dataframe. Additional values to insert are always empty since the type @@ -183,7 +162,7 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { } // Test type changes that aren't supported. - private val unsupportedTestCases: Seq[TypeEvolutionTestCase] = Seq( + protected val unsupportedTestCases: Seq[TypeEvolutionTestCase] = Seq( UnsupportedTypeEvolutionTestCase(IntegerType, ByteType, Seq(1, 2, Int.MinValue)), UnsupportedTypeEvolutionTestCase(LongType, IntegerType, @@ -192,6 +171,10 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { Seq(987654321.987654321d, Double.NaN, Double.NegativeInfinity, Double.PositiveInfinity, Double.MinPositiveValue, Double.MinValue, Double.MaxValue)), + UnsupportedTypeEvolutionTestCase(ByteType, DecimalType(2, 0), + Seq(1, -1, Byte.MinValue)), + UnsupportedTypeEvolutionTestCase(ShortType, DecimalType(4, 0), + Seq(1, -1, Short.MinValue)), UnsupportedTypeEvolutionTestCase(IntegerType, DecimalType(9, 0), Seq(1, -1, Int.MinValue)), UnsupportedTypeEvolutionTestCase(LongType, DecimalType(19, 0), @@ -219,6 +202,45 @@ trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase { DecimalType(Decimal.MAX_INT_DIGITS + 3, 1), Seq(BigDecimal("-67.89"), BigDecimal("9" * (Decimal.MAX_LONG_DIGITS - 2) + ".99"))) ) +} + +/** + * Trait collecting a subset of tests providing core coverage for type widening using ALTER TABLE + * CHANGE COLUMN TYPE. + */ +trait DeltaTypeWideningAlterTableTests extends QueryErrorsBase with DeltaTypeWideningTestCases { + self: QueryTest with ParquetTest with DeltaTypeWideningTestMixin with DeltaDMLTestUtils => + + import testImplicits._ + + for { + testCase <- supportedTestCases + partitioned <- BOOLEAN_DOMAIN + } { + test(s"type widening ${testCase.fromType.sql} -> ${testCase.toType.sql}, " + + s"partitioned=$partitioned") { + def writeData(df: DataFrame): Unit = if (partitioned) { + // The table needs to have at least 1 non-partition column, use a dummy one. + append(df.withColumn("dummy", lit(1)), partitionBy = Seq("value")) + } else { + append(df) + } + + writeData(testCase.initialValuesDF) + sql(s"ALTER TABLE delta.`$tempPath` CHANGE COLUMN value TYPE ${testCase.toType.sql}") + withAllParquetReaders { + assert(readDeltaTable(tempPath).schema("value").dataType === testCase.toType) + checkAnswer(readDeltaTable(tempPath).select("value").sort("value"), + testCase.initialValuesDF.select($"value".cast(testCase.toType)).sort("value")) + } + writeData(testCase.additionalValuesDF) + withAllParquetReaders { + checkAnswer( + readDeltaTable(tempPath).select("value").sort("value"), + testCase.expectedResult.sort("value")) + } + } + } for { testCase <- unsupportedTestCases diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala index f582b0a408..4768370a0e 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSchemaEvolutionSuite.scala @@ -83,7 +83,7 @@ trait MergeIntoSchemaEvolutionMixin { } test(s"schema evolution - $name - with evolution disabled") { - withSQLConf(confs: _*) { + withSQLConf(confs :+ (DeltaSQLConf.DELTA_SCHEMA_AUTO_MIGRATE.key, "false"): _*) { executeMergeAndAssert(expectedWithoutEvolution, expectErrorWithoutEvolutionContains) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala index 6f2839d7dc..50798e863f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoTestUtils.scala @@ -20,10 +20,8 @@ import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import io.delta.tables._ import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.catalyst.util.FailFastMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StructType /** * Base trait collecting helper methods to run MERGE tests. Merge test suite will want to mix in @@ -33,8 +31,6 @@ import org.apache.spark.sql.types.StructType trait MergeIntoTestUtils extends DeltaDMLTestUtils with MergeHelpers { self: SharedSparkSession => - import testImplicits._ - protected def executeMerge( target: String, source: String, @@ -57,23 +53,6 @@ trait MergeIntoTestUtils extends DeltaDMLTestUtils with MergeHelpers { protected def withCrossJoinEnabled(body: => Unit): Unit = { withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "true") { body } } - - /** - * Parse the input JSON data into a dataframe, one row per input element. - * Throws an exception on malformed inputs or records that don't comply with the provided schema. - */ - protected def readFromJSON(data: Seq[String], schema: StructType = null): DataFrame = { - if (schema != null) { - spark.read - .schema(schema) - .option("mode", FailFastMode.name) - .json(data.toDS) - } else { - spark.read - .option("mode", FailFastMode.name) - .json(data.toDS) - } - } } trait MergeIntoSQLTestUtils extends DeltaSQLTestUtils with MergeIntoTestUtils {