Skip to content

Commit

Permalink
[Spark] Correctly handles protocol properties during repeat table cre…
Browse files Browse the repository at this point in the history
…ation (#3681)

#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR fixes a bug that could occur, if a table is created at the same
location but under a different name and had
delta.minReader/WriterVersion set explicitly as part of the table
creation. Because these properties are removed from the table metadata,
they will not appear as part of the table property comparison during the
second table creation. As it is required for the properties to match,
the second creation will fail, even though the specified properties are
identical to the first one. This PR removes these two special properties
from the comparison to allow table creation to succeed.

## How was this patch tested?

Added a unit test to assert that repeat table creation succeeds, even if
minReader/WriterVersion is specified.

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
LukasRupprecht committed Sep 17, 2024
1 parent 8e97417 commit e1dd987
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -704,9 +704,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}

// We are done with protocol versions and features, time to remove related table properties.
val configsWithoutProtocolProps = newMetadataTmp.configuration.filterNot {
case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k)
}
val configsWithoutProtocolProps =
Protocol.filterProtocolPropsFromTableProps(newMetadataTmp.configuration)
// Table features Part 3: add automatically-enabled features by looking at the new table
// metadata.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,11 @@ object Protocol {
(getReaderVersionFromTableConf(conf), getWriterVersionFromTableConf(conf))
}

def filterProtocolPropsFromTableProps(properties: Map[String, String]): Map[String, String] =
properties.filterNot {
case (k, _) => TableFeatureProtocolUtils.isTableProtocolProperty(k)
}

/** Assert a table metadata contains no protocol-related table properties. */
def assertMetadataContainsNoProtocolProps(metadata: Metadata): Unit = {
assert(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaColumnMapping.{dropColumnMappingMetadata, filterColumnMappingProperties}
import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol}
import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.DomainMetadata
import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
Expand Down Expand Up @@ -541,6 +541,10 @@ case class CreateDeltaTableCommand(
// internal column mapping properties for the sake of comparison.
var filteredTableProperties = filterColumnMappingProperties(
tableDesc.properties)
// We also need to remove any protocol-related properties as we're filtering these
// from the metadata so they won't be present in the table properties.
filteredTableProperties =
Protocol.filterProtocolPropsFromTableProps(filteredTableProperties)
var filteredExistingProperties = filterColumnMappingProperties(
existingMetadata.configuration)
// Clustered table has internal table properties in Metadata configurations and they are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,35 @@ class DeltaDDLSuite extends DeltaDDLTestBase with SharedSparkSession
exception.getMessage.contains("Cannot change nullable column to non-nullable")
}

test("protocol-related properties are not considered during duplicate table creation") {
def createTable(tableName: String, location: String): Unit = {
sql(s"""
|CREATE TABLE $tableName (id INT, val STRING)
|USING DELTA
|LOCATION '$location'
|TBLPROPERTIES (
| 'delta.columnMapping.mode' = 'name',
| 'delta.minReaderVersion' = '2',
| 'delta.minWriterVersion' = '5'
|)""".stripMargin
)
}
withTempDir { dir =>
val table1 = "t1"
val table2 = "t2"
withTable(table1, table2) {
withSQLConf(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED.key -> "true") {
createTable(table1, dir.getCanonicalPath)
createTable(table2, dir.getCanonicalPath)
val catalogTable1 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table1))
val catalogTable2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table2))
assert(catalogTable1.properties("delta.columnMapping.mode") == "name")
assert(catalogTable2.properties("delta.columnMapping.mode") == "name")
}
}
}
}

test("table creation with ambiguous paths only allowed with legacy flag") {
// ambiguous paths not allowed
withTempDir { foo =>
Expand Down

0 comments on commit e1dd987

Please sign in to comment.