diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaViewHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaViewHelper.scala index eaeeee38a8..7e27f3f2eb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaViewHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaViewHelper.scala @@ -66,7 +66,7 @@ object DeltaViewHelper { // and ensuring that the project list exactly match the output of the scan. CollapseProject(EliminateSubqueryAliases(plan)) match { case View(desc, true, // isTempView - Project(outerList, scan: LogicalRelation)) + Project(outerList, scan: LogicalRelation)) if attributesMatch(outerList, scan.output) => Some(desc, outerList, scan) case _ => None diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala index b8457399dc..93697d2280 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoImplicitCastSuite.scala @@ -68,15 +68,16 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b long", insertJsonData = Seq("""{ "a": 1, "b": 4 }"""), - expectedResult = ExpectedResult.Failure(ex => { - checkError( - ex, - errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", - parameters = Map( - "currentField" -> "a", - "updateField" -> "a" + expectedResult = ExpectedResult.Failure { ex => { + checkError( + ex, + condition = "DELTA_FAILED_TO_MERGE_FIELDS", + parameters = Map( + "currentField" -> "a", + "updateField" -> "a" )) - }), + } + }, includeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), @@ -126,15 +127,16 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, a array>", insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""), - expectedResult = ExpectedResult.Failure(ex => { - checkError( - ex, - errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", - parameters = Map( - "currentField" -> "a", - "updateField" -> "a" + expectedResult = ExpectedResult.Failure { ex => { + checkError( + ex, + condition = "DELTA_FAILED_TO_MERGE_FIELDS", + parameters = Map( + "currentField" -> "a", + "updateField" -> "a" )) - }), + } + }, includeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), @@ -184,15 +186,16 @@ class DeltaInsertIntoImplicitCastSuite extends DeltaInsertIntoTest { overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, m map>", insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), - expectedResult = ExpectedResult.Failure(ex => { - checkError( - ex, - errorClass = "DELTA_FAILED_TO_MERGE_FIELDS", - parameters = Map( - "currentField" -> "m", - "updateField" -> "m" + expectedResult = ExpectedResult.Failure { ex => { + checkError( + ex, + condition = "DELTA_FAILED_TO_MERGE_FIELDS", + parameters = Map( + "currentField" -> "m", + "updateField" -> "m" )) - }), + } + }, includeInserts = Seq( DFv1SaveAsTable(SaveMode.Append), DFv1SaveAsTable(SaveMode.Overwrite), diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala index 2cc22f5544..47fe6c8632 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTest.scala @@ -256,12 +256,12 @@ trait DeltaInsertIntoTest extends QueryTest with DeltaDMLTestUtils with DeltaSQL withSQLConf(confs: _*) { expectedResult match { - case Left(expectedSchema) => + case ExpectedResult.Success(expectedSchema) => runInsert() val target = spark.read.table("target") assert(target.schema === expectedSchema) checkAnswer(target, insert.expectedResult(initialDF, insertDF)) - case Right(checkError) => + case ExpectedResult.Failure(checkError) => val ex = intercept[SparkThrowable] { runInsert() } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index 067ba696bd..0f7463ca37 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -240,16 +240,16 @@ trait DeltaTestUtilsBase { } /** - * Helper type to define the expected result of a test case. + * Helper types to define the expected result of a test case. * Either: * - Success: include an expected value to check, e.g. expected schema or result as a DF or rows. * - Failure: an exception is thrown and the caller passes a function to check that it matches an * expected error, typ. `checkError()` or `checkErrorMatchPVals()`. */ - type ExpectedResult[T] = Either[T, SparkThrowable => Unit] + sealed trait ExpectedResult[T] object ExpectedResult { - def Success[T](expected: T): ExpectedResult[T] = Left(expected) - def Failure[T](checkError: SparkThrowable => Unit): ExpectedResult[T] = Right(checkError) + case class Success[T](expected: T) extends ExpectedResult[T] + case class Failure[T](checkError: SparkThrowable => Unit) extends ExpectedResult[T] } /** Utility method to check exception `e` is of type `E` or a cause of it is of type `E` */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala index 2d388fd2ec..2159d22590 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/MergeIntoSuiteBase.scala @@ -3135,12 +3135,12 @@ abstract class MergeIntoSuiteBase insert = "(v.key, v.value) VALUES (src.a, src.b)") expectedResult match { - case Right(checkError) => + case ExpectedResult.Failure(checkError) => val ex = intercept[AnalysisException] { runMerge() } checkError(ex) - case Left(expectedRows) => + case ExpectedResult.Success(expectedRows) => if (checkViewStripped) { checkStripViewFromTarget(target = "v") } @@ -3161,29 +3161,30 @@ abstract class MergeIntoSuiteBase testTempViews("basic - merge condition references subset of target cols")( text = "SELECT * FROM tab", mergeCondition = "src.a = v.key", - expectedResult = Left(Seq(Row(0, 3), Row(1, 3), Row(3, 4))) + expectedResult = ExpectedResult.Success(Seq(Row(0, 3), Row(1, 3), Row(3, 4))) ) testTempViews("subset cols")( text = "SELECT key FROM tab", mergeCondition = "src.a = v.key AND src.b = v.value", - expectedResult = ExpectedResult.Failure(ex => + expectedResult = ExpectedResult.Failure { ex => assert(ex.getErrorClass === "UNRESOLVED_COLUMN.WITH_SUGGESTION") - ) + } ) testTempViews("superset cols")( text = "SELECT key, value, 1 FROM tab", mergeCondition = "src.a = v.key AND src.b = v.value", // The analyzer can't tell whether the table originally had the extra column or not. - expectedResult = ExpectedResult.Failure(ex => + expectedResult = ExpectedResult.Failure { ex => checkError( ex, errorClass = "DELTA_SCHEMA_CHANGE_SINCE_ANALYSIS", parameters = Map( "schemaDiff" -> "Latest schema is missing field(s): 1", "legacyFlagMessage" -> "" - ))) + )) + } ) testTempViews("nontrivial projection")( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala index 7ec787b2d4..fb999735da 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/typewidening/TypeWideningInsertSchemaEvolutionSuite.scala @@ -168,7 +168,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b int", insertJsonData = Seq("""{ "a": 1, "b": 4 }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("a", IntegerType) .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType))), @@ -182,7 +182,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b int, c short", insertJsonData = Seq("""{ "a": 1, "b": 5, "c": 6 }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("a", IntegerType) .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) @@ -197,7 +197,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "a" -> 1, insertSchemaDDL = "a int, b int, c int", insertJsonData = Seq("""{ "a": 1, "b": 4, "c": 5 }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("a", IntegerType) .add("b", IntegerType, nullable = true, metadata = typeWideningMetadata(version = 1, from = ShortType, to = IntegerType)) @@ -219,7 +219,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, s struct, m map, a array", insertJsonData = Seq("""{ "key": 1, "s": { "x": 4, "y": 5 }, "m": { "p": 6 }, "a": [7] }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("key", IntegerType) .add("s", new StructType() .add("x", ShortType) @@ -251,7 +251,7 @@ trait TypeWideningInsertSchemaEvolutionTests "key int, s struct, m map, a array", insertJsonData = Seq("""{ "key": 1, "s": { "x": 4, "y": 5, "z": 8 }, "m": { "p": 6 }, "a": [7] }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("key", IntegerType) .add("s", new StructType() .add("x", ShortType) @@ -281,7 +281,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, s struct", insertJsonData = Seq("""{ "key": 1, "s": { "x": 4, "y": 5 } }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("key", IntegerType) .add("s", new StructType() .add("x", IntegerType) @@ -300,7 +300,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, a array>", insertJsonData = Seq("""{ "key": 1, "a": [ { "x": 3, "y": 4 } ] }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("key", IntegerType) .add("a", ArrayType(new StructType() .add("x", IntegerType) @@ -318,7 +318,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, m map>", insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("key", IntegerType) // Type evolution wasn't applied in the map. .add("m", MapType(StringType, new StructType() @@ -343,7 +343,7 @@ trait TypeWideningInsertSchemaEvolutionTests overwriteWhere = "key" -> 1, insertSchemaDDL = "key int, m map>", insertJsonData = Seq("""{ "key": 1, "m": { "a": { "x": 3, "y": 4 } } }"""), - expectedResult = ExpectedResult.Success(expected = new StructType() + expectedResult = ExpectedResult.Success(new StructType() .add("key", IntegerType) // Type evolution was applied in the map. .add("m", MapType(StringType, new StructType()