From e1dd98728bf79beeace976db5f3179a305335ae7 Mon Sep 17 00:00:00 2001 From: Lukas Rupprecht Date: Tue, 17 Sep 2024 13:04:28 -0700 Subject: [PATCH] [Spark] Correctly handles protocol properties during repeat table creation (#3681) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR fixes a bug that could occur, if a table is created at the same location but under a different name and had delta.minReader/WriterVersion set explicitly as part of the table creation. Because these properties are removed from the table metadata, they will not appear as part of the table property comparison during the second table creation. As it is required for the properties to match, the second creation will fail, even though the specified properties are identical to the first one. This PR removes these two special properties from the comparison to allow table creation to succeed. ## How was this patch tested? Added a unit test to assert that repeat table creation succeeds, even if minReader/WriterVersion is specified. ## Does this PR introduce _any_ user-facing changes? No --- .../sql/delta/OptimisticTransaction.scala | 5 ++-- .../spark/sql/delta/actions/actions.scala | 5 ++++ .../commands/CreateDeltaTableCommand.scala | 6 +++- .../spark/sql/delta/DeltaDDLSuite.scala | 29 +++++++++++++++++++ 4 files changed, 41 insertions(+), 4 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index fd10bd99a3..54ca25bb27 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -704,9 +704,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } // We are done with protocol versions and features, time to remove related table properties. - val configsWithoutProtocolProps = newMetadataTmp.configuration.filterNot { - case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k) - } + val configsWithoutProtocolProps = + Protocol.filterProtocolPropsFromTableProps(newMetadataTmp.configuration) // Table features Part 3: add automatically-enabled features by looking at the new table // metadata. // diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index a692ce2c41..a5faa22dac 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -425,6 +425,11 @@ object Protocol { (getReaderVersionFromTableConf(conf), getWriterVersionFromTableConf(conf)) } + def filterProtocolPropsFromTableProps(properties: Map[String, String]): Map[String, String] = + properties.filterNot { + case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k) + } + /** Assert a table metadata contains no protocol-related table properties. */ def assertMetadataContainsNoProtocolProps(metadata: Metadata): Unit = { assert( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index dfef8de28b..797d7ba196 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaColumnMapping.{dropColumnMappingMetadata, filterColumnMappingProperties} -import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} +import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.delta.actions.DomainMetadata import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils @@ -541,6 +541,10 @@ case class CreateDeltaTableCommand( // internal column mapping properties for the sake of comparison. var filteredTableProperties = filterColumnMappingProperties( tableDesc.properties) + // We also need to remove any protocol-related properties as we're filtering these + // from the metadata so they won't be present in the table properties. + filteredTableProperties = + Protocol.filterProtocolPropsFromTableProps(filteredTableProperties) var filteredExistingProperties = filterColumnMappingProperties( existingMetadata.configuration) // Clustered table has internal table properties in Metadata configurations and they are diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala index 2304da97c1..13f5c8f618 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala @@ -41,6 +41,35 @@ class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession exception.getMessage.contains("Cannot change nullable column to non-nullable") } + test("protocol-related properties are not considered during duplicate table creation") { + def createTable(tableName: String, location: String): Unit = { + sql(s""" + |CREATE TABLE $tableName (id INT, val STRING) + |USING DELTA + |LOCATION '$location' + |TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '5' + |)""".stripMargin + ) + } + withTempDir { dir => + val table1 = "t1" + val table2 = "t2" + withTable(table1, table2) { + withSQLConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") { + createTable(table1, dir.getCanonicalPath) + createTable(table2, dir.getCanonicalPath) + val catalogTable1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table1)) + val catalogTable2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table2)) + assert(catalogTable1.properties("delta.columnMapping.mode") == "name") + assert(catalogTable2.properties("delta.columnMapping.mode") == "name") + } + } + } + } + test("table creation with ambiguous paths only allowed with legacy flag") { // ambiguous paths not allowed withTempDir { foo =>