diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index fd10bd99a3..54ca25bb27 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -704,9 +704,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } // We are done with protocol versions and features, time to remove related table properties. - val configsWithoutProtocolProps = newMetadataTmp.configuration.filterNot { - case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k) - } + val configsWithoutProtocolProps = + Protocol.filterProtocolPropsFromTableProps(newMetadataTmp.configuration) // Table features Part 3: add automatically-enabled features by looking at the new table // metadata. // diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala index a692ce2c41..a5faa22dac 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala @@ -425,6 +425,11 @@ object Protocol { (getReaderVersionFromTableConf(conf), getWriterVersionFromTableConf(conf)) } + def filterProtocolPropsFromTableProps(properties: Map[String, String]): Map[String, String] = + properties.filterNot { + case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k) + } + /** Assert a table metadata contains no protocol-related table properties. */ def assertMetadataContainsNoProtocolProps(metadata: Metadata): Unit = { assert( diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index dfef8de28b..797d7ba196 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.DeltaColumnMapping.{dropColumnMappingMetadata, filterColumnMappingProperties} -import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol} +import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils} import org.apache.spark.sql.delta.actions.DomainMetadata import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils @@ -541,6 +541,10 @@ case class CreateDeltaTableCommand( // internal column mapping properties for the sake of comparison. var filteredTableProperties = filterColumnMappingProperties( tableDesc.properties) + // We also need to remove any protocol-related properties as we're filtering these + // from the metadata so they won't be present in the table properties. + filteredTableProperties = + Protocol.filterProtocolPropsFromTableProps(filteredTableProperties) var filteredExistingProperties = filterColumnMappingProperties( existingMetadata.configuration) // Clustered table has internal table properties in Metadata configurations and they are diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala index 2304da97c1..13f5c8f618 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaDDLSuite.scala @@ -41,6 +41,35 @@ class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession exception.getMessage.contains("Cannot change nullable column to non-nullable") } + test("protocol-related properties are not considered during duplicate table creation") { + def createTable(tableName: String, location: String): Unit = { + sql(s""" + |CREATE TABLE $tableName (id INT, val STRING) + |USING DELTA + |LOCATION '$location' + |TBLPROPERTIES ( + | 'delta.columnMapping.mode' = 'name', + | 'delta.minReaderVersion' = '2', + | 'delta.minWriterVersion' = '5' + |)""".stripMargin + ) + } + withTempDir { dir => + val table1 = "t1" + val table2 = "t2" + withTable(table1, table2) { + withSQLConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") { + createTable(table1, dir.getCanonicalPath) + createTable(table2, dir.getCanonicalPath) + val catalogTable1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table1)) + val catalogTable2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table2)) + assert(catalogTable1.properties("delta.columnMapping.mode") == "name") + assert(catalogTable2.properties("delta.columnMapping.mode") == "name") + } + } + } + } + test("table creation with ambiguous paths only allowed with legacy flag") { // ambiguous paths not allowed withTempDir { foo =>