Skip to content

Commit

Permalink
Extend ClusteredTableDDLSuite with Coordinated Commits and Fix Issues
Browse files Browse the repository at this point in the history
  • Loading branch information
yumingxuanguo-db committed Sep 24, 2024
1 parent a8cc4b4 commit 97ee525
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
Expand Down Expand Up @@ -149,11 +150,16 @@ class DeltaAnalysis(session: SparkSession)
// used on the source delta table, then the corresponding fields would be set for the
// sourceTable and needs to be removed from the targetTable's configuration. The fields
// will then be set in the targetTable's configuration internally after.
// Coordinated commits configurations from the source delta table should also be left out,
// since CREATE LIKE is similar to CLONE, and we do not copy the commit coordinator from
// the source table. If users want a commit coordinator for the target table, they can
// specify the configurations in the CREATE LIKE command explicitly.
val sourceMetadata = deltaLogSrc.initialSnapshot.metadata
val config =
sourceMetadata.configuration.-("delta.columnMapping.maxColumnId")
.-(MaterializedRowId.MATERIALIZED_COLUMN_NAME_PROP)
.-(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP)
.filterKeys(!CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS.contains(_)).toMap

new CatalogTable(
identifier = targetTableIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,33 @@ case class CreateDeltaTableCommand(
}
}


/**
* When creating an external table in a location where some table already existed, we make sure
* that the specified table properties match the existing table properties. Since Coordinated
* Commits is not designed to be overridden, we should not error out if the command omits these
* properties. If the existing table has Coordinated Commits enabled, we also do not error out if
* the command omits the ICT properties, which are the dependencies for Coordinated Commits.
*/
private def filterCoordinatedCommitsProperties(
existingProperties: Map[String, String],
tableProperties: Map[String, String]): Map[String, String] = {
var filteredExistingProperties = existingProperties
val overridingCoordinatedCommitsConfs =
CoordinatedCommitsUtils.extractCoordinatedCommitsConfigurations(tableProperties)
val existingCoordinatedCommitsConfs =
CoordinatedCommitsUtils.extractCoordinatedCommitsConfigurations(existingProperties)
if (existingCoordinatedCommitsConfs.nonEmpty && overridingCoordinatedCommitsConfs.isEmpty) {
filteredExistingProperties --= CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS
val overridingICTConfs = CoordinatedCommitsUtils.extractICTConfigurations(tableProperties)
val existingICTConfs = CoordinatedCommitsUtils.extractICTConfigurations(existingProperties)
if (existingICTConfs.nonEmpty && overridingICTConfs.isEmpty) {
filteredExistingProperties --= CoordinatedCommitsUtils.ICT_TABLE_PROPERTY_KEYS
}
}
filteredExistingProperties
}

/**
* Verify against our transaction metadata that the user specified the right metadata for the
* table.
Expand Down Expand Up @@ -565,6 +592,8 @@ case class CreateDeltaTableCommand(
filteredTableProperties =
ClusteredTableUtils.removeInternalTableProperties(filteredTableProperties)
}
filteredExistingProperties =
filterCoordinatedCommitsProperties(filteredExistingProperties, filteredTableProperties)
if (filteredTableProperties != filteredExistingProperties) {
throw DeltaErrors.createTableWithDifferentPropertiesException(
path, filteredTableProperties, filteredExistingProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1720,4 +1720,36 @@ class CoordinatedCommitsSuite
}
}
}

test("CREATE LIKE does not copy Coordinated Commits configurations from the source table.") {
CommitCoordinatorProvider.registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1))

val source = "sourcetable"
val target = "targettable"
sql(s"CREATE TABLE $source (id LONG) USING delta TBLPROPERTIES" +
s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " +
s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')")
sql(s"CREATE TABLE $target LIKE $source")
assert(DeltaLog.forTable(spark, target).snapshot.tableCommitCoordinatorClientOpt.isEmpty)
}

test("CREATE an external table in a location with an existing table works correctly.") {
CommitCoordinatorProvider.registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1))

// When the existing table has a commit coordinator, omitting CC configurations in the command
// should not throw an exception, and the commit coordinator should be retained, so should ICT.
withTempDir { dir =>
val tableName = "testtable"
val tablePath = dir.getAbsolutePath
sql(s"CREATE TABLE delta.`${dir.getAbsolutePath}` (id LONG) USING delta TBLPROPERTIES " +
s"('foo' = 'bar', " +
s"'${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " +
s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')")
sql(s"CREATE TABLE $tableName (id LONG) USING delta TBLPROPERTIES " +
s"('foo' = 'bar') LOCATION '${dir.getAbsolutePath}'")
assert(DeltaLog.forTable(spark, tablePath).snapshot.tableCommitCoordinatorClientOpt.nonEmpty)
assert(DeltaLog.forTable(spark, tablePath).snapshot.metadata.configuration.contains(
IN_COMMIT_TIMESTAMPS_ENABLED.key))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.coordinatedcommits
import java.util.Optional
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaTestUtilsBase}
Expand Down Expand Up @@ -326,6 +327,9 @@ trait CoordinatedCommitsBaseSuite

final def coordinatedCommitsEnabledInTests: Boolean = coordinatedCommitsBackfillBatchSize.nonEmpty

// Keeps track of the number of table names pointing to the location.
protected val locRefCount: mutable.Map[String, Int] = mutable.Map.empty

// In case some tests reuse the table path/name with DROP table, this method can be used to
// clean the table data in the commit coordinator. Note that we should call this before
// the table actually gets DROP.
Expand All @@ -338,18 +342,28 @@ trait CoordinatedCommitsBaseSuite
val location = try {
spark.sql(s"describe detail $tableName")
.select("location")
.first
.first()
.getAs[String](0)
} catch {
case NonFatal(_) =>
// Ignore if the table does not exist/broken.
return
}
val logPath = location + "/_delta_log"
cc.asInstanceOf[TrackingCommitCoordinatorClient]
.delegatingCommitCoordinatorClient
.asInstanceOf[InMemoryCommitCoordinator]
.dropTable(new Path(logPath))
val locKey = location.stripPrefix("file:")
if (locRefCount.contains(locKey)) {
locRefCount(locKey) -= 1
}
// When we create an external table in a location where some table already existed, two table
// names could be pointing to the same location. We should only clean up the table data in the
// commit coordinator when the last table name pointing to the location is dropped.
if (locRefCount.getOrElse(locKey, 0) == 0) {
val logPath = location + "/_delta_log"
cc.asInstanceOf[TrackingCommitCoordinatorClient]
.delegatingCommitCoordinatorClient
.asInstanceOf[InMemoryCommitCoordinator]
.dropTable(new Path(logPath))
}
DeltaLog.clearCache()
}

override protected def sparkConf: SparkConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ trait ClusteredTableTestUtilsBase
}
spark.sql(s"$clause TABLE $table ($schema) USING delta CLUSTER BY ($clusterBy) " +
s"$tablePropertiesClause $locationClause")
location.foreach { loc => locRefCount(loc) = locRefCount.getOrElse(loc, 0) + 1 }
}

protected def createOrReplaceAsSelectClusteredTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions}
import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingEnableIdMode, DeltaColumnMappingEnableNameMode, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaUnsupportedOperationException, NoMapping}
import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain
import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
import org.apache.spark.sql.delta.hooks.UpdateCatalog
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.stats.SkippingEligibleDataType
Expand Down Expand Up @@ -250,10 +251,11 @@ trait ClusteredTableCreateOrReplaceDDLSuiteBase extends QueryTest
if (clause == "CREATE") {
// Drop the table and delete the _delta_log directory to allow
// external delta table creation.
deleteTableFromCommitCoordinatorIfNeeded("dstTbl")
sql("DROP TABLE IF EXISTS dstTbl")
Utils.deleteRecursively(new File(tmpDir, "_delta_log"))
}
// Qualified data types and no exception is epxected.
// Qualified data types and no exception is expected.
f()
} else {
val e = intercept[DeltaAnalysisException] {
Expand Down Expand Up @@ -994,7 +996,9 @@ trait ClusteredTableDDLSuiteBase
}
}

trait ClusteredTableDDLSuite extends ClusteredTableDDLSuiteBase
trait ClusteredTableDDLSuite
extends ClusteredTableDDLSuiteBase
with CoordinatedCommitsBaseSuite

trait ClusteredTableDDLWithNameColumnMapping
extends ClusteredTableCreateOrReplaceDDLSuite with DeltaColumnMappingEnableNameMode
Expand Down Expand Up @@ -1259,3 +1263,8 @@ class ClusteredTableDDLDataSourceV2NameColumnMappingSuite
with ClusteredTableDDLWithV2
with ClusteredTableDDLWithColumnMappingV2
with ClusteredTableDDLSuite

class ClusteredTableDDLDataSourceV2WithCoordinatedCommitsBatch100Suite
extends ClusteredTableDDLDataSourceV2Suite {
override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100)
}

0 comments on commit 97ee525

Please sign in to comment.