From 97ee5259bae2bd119ecd9ef10c6f65a6d7ac0032 Mon Sep 17 00:00:00 2001 From: Yumingxuan Guo Date: Tue, 24 Sep 2024 15:40:01 -0700 Subject: [PATCH] Extend ClusteredTableDDLSuite with Coordinated Commits and Fix Issues --- .../spark/sql/delta/DeltaAnalysis.scala | 6 ++++ .../commands/CreateDeltaTableCommand.scala | 29 +++++++++++++++++ .../CoordinatedCommitsSuite.scala | 32 +++++++++++++++++++ .../CoordinatedCommitsTestUtils.scala | 26 +++++++++++---- .../skipping/ClusteredTableTestUtils.scala | 1 + .../clustering/ClusteredTableDDLSuite.scala | 13 ++++++-- 6 files changed, 99 insertions(+), 8 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 6ace005784..747400590a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder import org.apache.spark.sql.delta.commands._ import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint} +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsUtils import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils @@ -149,11 +150,16 @@ class DeltaAnalysis(session: SparkSession) // used on the source delta table, then the corresponding fields would be set for the // sourceTable and needs to be removed from the targetTable's configuration. The fields // will then be set in the targetTable's configuration internally after. + // Coordinated commits configurations from the source delta table should also be left out, + // since CREATE LIKE is similar to CLONE, and we do not copy the commit coordinator from + // the source table. If users want a commit coordinator for the target table, they can + // specify the configurations in the CREATE LIKE command explicitly. val sourceMetadata = deltaLogSrc.initialSnapshot.metadata val config = sourceMetadata.configuration.-("delta.columnMapping.maxColumnId") .-(MaterializedRowId.MATERIALIZED_COLUMN_NAME_PROP) .-(MaterializedRowCommitVersion.MATERIALIZED_COLUMN_NAME_PROP) + .filterKeys(!CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS.contains(_)).toMap new CatalogTable( identifier = targetTableIdentifier, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala index 644edbe135..cc5f56f1ed 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/CreateDeltaTableCommand.scala @@ -496,6 +496,33 @@ case class CreateDeltaTableCommand( } } + + /** + * When creating an external table in a location where some table already existed, we make sure + * that the specified table properties match the existing table properties. Since Coordinated + * Commits is not designed to be overridden, we should not error out if the command omits these + * properties. If the existing table has Coordinated Commits enabled, we also do not error out if + * the command omits the ICT properties, which are the dependencies for Coordinated Commits. + */ + private def filterCoordinatedCommitsProperties( + existingProperties: Map[String, String], + tableProperties: Map[String, String]): Map[String, String] = { + var filteredExistingProperties = existingProperties + val overridingCoordinatedCommitsConfs = + CoordinatedCommitsUtils.extractCoordinatedCommitsConfigurations(tableProperties) + val existingCoordinatedCommitsConfs = + CoordinatedCommitsUtils.extractCoordinatedCommitsConfigurations(existingProperties) + if (existingCoordinatedCommitsConfs.nonEmpty && overridingCoordinatedCommitsConfs.isEmpty) { + filteredExistingProperties --= CoordinatedCommitsUtils.TABLE_PROPERTY_KEYS + val overridingICTConfs = CoordinatedCommitsUtils.extractICTConfigurations(tableProperties) + val existingICTConfs = CoordinatedCommitsUtils.extractICTConfigurations(existingProperties) + if (existingICTConfs.nonEmpty && overridingICTConfs.isEmpty) { + filteredExistingProperties --= CoordinatedCommitsUtils.ICT_TABLE_PROPERTY_KEYS + } + } + filteredExistingProperties + } + /** * Verify against our transaction metadata that the user specified the right metadata for the * table. @@ -565,6 +592,8 @@ case class CreateDeltaTableCommand( filteredTableProperties = ClusteredTableUtils.removeInternalTableProperties(filteredTableProperties) } + filteredExistingProperties = + filterCoordinatedCommitsProperties(filteredExistingProperties, filteredTableProperties) if (filteredTableProperties != filteredExistingProperties) { throw DeltaErrors.createTableWithDifferentPropertiesException( path, filteredTableProperties, filteredExistingProperties) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala index ae2643d205..7b75273318 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsSuite.scala @@ -1720,4 +1720,36 @@ class CoordinatedCommitsSuite } } } + + test("CREATE LIKE does not copy Coordinated Commits configurations from the source table.") { + CommitCoordinatorProvider.registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) + + val source = "sourcetable" + val target = "targettable" + sql(s"CREATE TABLE $source (id LONG) USING delta TBLPROPERTIES" + + s"('${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + + s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") + sql(s"CREATE TABLE $target LIKE $source") + assert(DeltaLog.forTable(spark, target).snapshot.tableCommitCoordinatorClientOpt.isEmpty) + } + + test("CREATE an external table in a location with an existing table works correctly.") { + CommitCoordinatorProvider.registerBuilder(TrackingInMemoryCommitCoordinatorBuilder(1)) + + // When the existing table has a commit coordinator, omitting CC configurations in the command + // should not throw an exception, and the commit coordinator should be retained, so should ICT. + withTempDir { dir => + val tableName = "testtable" + val tablePath = dir.getAbsolutePath + sql(s"CREATE TABLE delta.`${dir.getAbsolutePath}` (id LONG) USING delta TBLPROPERTIES " + + s"('foo' = 'bar', " + + s"'${COORDINATED_COMMITS_COORDINATOR_NAME.key}' = 'tracking-in-memory', " + + s"'${COORDINATED_COMMITS_COORDINATOR_CONF.key}' = '${JsonUtils.toJson(Map())}')") + sql(s"CREATE TABLE $tableName (id LONG) USING delta TBLPROPERTIES " + + s"('foo' = 'bar') LOCATION '${dir.getAbsolutePath}'") + assert(DeltaLog.forTable(spark, tablePath).snapshot.tableCommitCoordinatorClientOpt.nonEmpty) + assert(DeltaLog.forTable(spark, tablePath).snapshot.metadata.configuration.contains( + IN_COMMIT_TIMESTAMPS_ENABLED.key)) + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala index 05c08d9d33..ab2d3ccbbb 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.delta.coordinatedcommits import java.util.Optional import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable import scala.util.control.NonFatal import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaTestUtilsBase} @@ -326,6 +327,9 @@ trait CoordinatedCommitsBaseSuite final def coordinatedCommitsEnabledInTests: Boolean = coordinatedCommitsBackfillBatchSize.nonEmpty + // Keeps track of the number of table names pointing to the location. + protected val locRefCount: mutable.Map[String, Int] = mutable.Map.empty + // In case some tests reuse the table path/name with DROP table, this method can be used to // clean the table data in the commit coordinator. Note that we should call this before // the table actually gets DROP. @@ -338,18 +342,28 @@ trait CoordinatedCommitsBaseSuite val location = try { spark.sql(s"describe detail $tableName") .select("location") - .first + .first() .getAs[String](0) } catch { case NonFatal(_) => // Ignore if the table does not exist/broken. return } - val logPath = location + "/_delta_log" - cc.asInstanceOf[TrackingCommitCoordinatorClient] - .delegatingCommitCoordinatorClient - .asInstanceOf[InMemoryCommitCoordinator] - .dropTable(new Path(logPath)) + val locKey = location.stripPrefix("file:") + if (locRefCount.contains(locKey)) { + locRefCount(locKey) -= 1 + } + // When we create an external table in a location where some table already existed, two table + // names could be pointing to the same location. We should only clean up the table data in the + // commit coordinator when the last table name pointing to the location is dropped. + if (locRefCount.getOrElse(locKey, 0) == 0) { + val logPath = location + "/_delta_log" + cc.asInstanceOf[TrackingCommitCoordinatorClient] + .delegatingCommitCoordinatorClient + .asInstanceOf[InMemoryCommitCoordinator] + .dropTable(new Path(logPath)) + } + DeltaLog.clearCache() } override protected def sparkConf: SparkConf = { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala index 3757793e5d..11ec8b5bb0 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala @@ -221,6 +221,7 @@ trait ClusteredTableTestUtilsBase } spark.sql(s"$clause TABLE $table ($schema) USING delta CLUSTER BY ($clusterBy) " + s"$tablePropertiesClause $locationClause") + location.foreach { loc => locRefCount(loc) = locRefCount.getOrElse(loc, 0) + 1 } } protected def createOrReplaceAsSelectClusteredTable( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala index a2fe9f5bd8..3601af2621 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/skipping/clustering/ClusteredTableDDLSuite.scala @@ -22,6 +22,7 @@ import com.databricks.spark.util.{Log4jUsageLogger, MetricDefinitions} import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingEnableIdMode, DeltaColumnMappingEnableNameMode, DeltaConfigs, DeltaExcludedBySparkVersionTestMixinShims, DeltaLog, DeltaUnsupportedOperationException, NoMapping} import org.apache.spark.sql.delta.clustering.ClusteringMetadataDomain +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.hooks.UpdateCatalog import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.stats.SkippingEligibleDataType @@ -250,10 +251,11 @@ trait ClusteredTableCreateOrReplaceDDLSuiteBase extends QueryTest if (clause == "CREATE") { // Drop the table and delete the _delta_log directory to allow // external delta table creation. + deleteTableFromCommitCoordinatorIfNeeded("dstTbl") sql("DROP TABLE IF EXISTS dstTbl") Utils.deleteRecursively(new File(tmpDir, "_delta_log")) } - // Qualified data types and no exception is epxected. + // Qualified data types and no exception is expected. f() } else { val e = intercept[DeltaAnalysisException] { @@ -994,7 +996,9 @@ trait ClusteredTableDDLSuiteBase } } -trait ClusteredTableDDLSuite extends ClusteredTableDDLSuiteBase +trait ClusteredTableDDLSuite + extends ClusteredTableDDLSuiteBase + with CoordinatedCommitsBaseSuite trait ClusteredTableDDLWithNameColumnMapping extends ClusteredTableCreateOrReplaceDDLSuite with DeltaColumnMappingEnableNameMode @@ -1259,3 +1263,8 @@ class ClusteredTableDDLDataSourceV2NameColumnMappingSuite with ClusteredTableDDLWithV2 with ClusteredTableDDLWithColumnMappingV2 with ClusteredTableDDLSuite + +class ClusteredTableDDLDataSourceV2WithCoordinatedCommitsBatch100Suite + extends ClusteredTableDDLDataSourceV2Suite { + override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +}