diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index 2805530676c..22469c29fbe 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -64,7 +64,7 @@ }, "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS" : { "message" : [ - "Change data feed (CDF) reads are currently not supported on tables with column mapping enabled." + "Change Data Feed (CDF) reads are not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: . Incompatible schema: . " ], "sqlState" : "0A000" }, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala index 74a92d82742..fbc12b76653 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaColumnMapping.scala @@ -21,6 +21,7 @@ import java.util.{Locale, UUID} import scala.collection.mutable import org.apache.spark.sql.delta.actions.{Metadata, Protocol} +import org.apache.spark.sql.delta.commands.cdc.CDCReader import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} @@ -52,7 +53,11 @@ trait DeltaColumnMappingBase extends DeltaLogging { * * This list is case-insensitive. */ - protected val DELTA_INTERNAL_COLUMNS: Set[String] = Set.empty + protected val DELTA_INTERNAL_COLUMNS: Set[String] = + (CDCReader.CDC_COLUMNS_IN_DATA ++ Seq( + CDCReader.CDC_COMMIT_VERSION, + CDCReader.CDC_COMMIT_TIMESTAMP) + ).map(_.toLowerCase(Locale.ROOT)).toSet val supportedModes: Set[DeltaColumnMappingMode] = Set(NoMapping, NameMapping) @@ -504,6 +509,44 @@ trait DeltaColumnMappingBase extends DeltaLogging { newPhysicalToLogicalMap.get(physicalPath).exists(_.name != field.name) } } + + /** + * Compare the old metadata's schema with new metadata's schema for column mapping schema changes. + * + * newMetadata's snapshot version must be >= oldMetadata's snapshot version so we could reliably + * detect the difference between ADD COLUMN and DROP COLUMN. + * + * As of now, `newMetadata` is column mapping read compatible with `oldMetadata` if + * no rename column or drop column has happened in-between. + */ + def isColumnMappingReadCompatible(newMetadata: Metadata, oldMetadata: Metadata): Boolean = { + val (oldMode, newMode) = (oldMetadata.columnMappingMode, newMetadata.columnMappingMode) + if (oldMode != NoMapping && newMode != NoMapping) { + // Both changes are post column mapping enabled + !isRenameColumnOperation(newMetadata, oldMetadata) && + !isDropColumnOperation(newMetadata, oldMetadata) + } else if (oldMode == NoMapping && newMode != NoMapping) { + // The old metadata does not have column mapping while the new metadata does, in this case + // we assume an upgrade has happened in between. + // So we manually construct a post-upgrade schema for the old metadata and compare that with + // the new metadata, as the upgrade would use the logical name as the physical name, we could + // easily capture any difference in the schema using the same is{XXX}ColumnOperation utils. + var upgradedMetadata = assignColumnIdAndPhysicalName( + oldMetadata, oldMetadata, isChangingModeOnExistingTable = true + ) + // need to change to a column mapping mode too so the utils below can recognize + upgradedMetadata = upgradedMetadata.copy( + configuration = upgradedMetadata.configuration ++ + Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name) + ) + // use the same check + !isRenameColumnOperation(newMetadata, upgradedMetadata) && + !isDropColumnOperation(newMetadata, upgradedMetadata) + } else { + // Not column mapping, don't block + true + } + } } object DeltaColumnMapping extends DeltaColumnMappingBase diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index b83e9de898e..f240280d061 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2265,9 +2265,22 @@ trait DeltaErrorsBase // scalastyle:on line.size.limit } - def blockCdfAndColumnMappingReads(): Throwable = { + + val columnMappingCDFBatchBlockHint: String = + s"You may force enable batch CDF read at your own risk by turning on " + + s"${DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key}." + + def blockCdfAndColumnMappingReads( + isStreaming: Boolean, + readSchema: Option[StructType] = None, + incompatibleSchema: Option[StructType] = None): Throwable = { new DeltaUnsupportedOperationException( - errorClass = "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS" + errorClass = "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS", + messageParameters = Array( + readSchema.map(_.json).getOrElse(""), + incompatibleSchema.map(_.json).getOrElse(""), + if (isStreaming) "" else columnMappingCDFBatchBlockHint + ) ) } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index 1efd74862c5..08c8b60164e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -20,7 +20,7 @@ import java.sql.Timestamp import scala.collection.mutable.ListBuffer -import org.apache.spark.sql.delta.{DeltaConfigs, DeltaErrors, DeltaHistoryManager, DeltaLog, DeltaOperations, DeltaParquetFileFormat, DeltaTableUtils, DeltaTimeTravelSpec, NoMapping, Snapshot} +import org.apache.spark.sql.delta._ import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, CommitInfo, FileAction, Metadata, RemoveFile} import org.apache.spark.sql.delta.files.{CdcAddFileIndex, TahoeChangeFileIndex, TahoeFileIndex, TahoeRemoveFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging @@ -264,8 +264,10 @@ object CDCReader extends DeltaLogging { // If the table has column mapping enabled, throw an error. With column mapping, certain schema // changes are possible (rename a column or drop a column) which don't work well with CDF. - if (snapshot.metadata.columnMappingMode != NoMapping) { - throw DeltaErrors.blockCdfAndColumnMappingReads() + // TODO: remove this after the proper blocking semantics is rolled out + // This is only blocking streaming CDF, batch CDF will be blocked differently below. + if (isStreaming && snapshot.metadata.columnMappingMode != NoMapping) { + throw DeltaErrors.blockCdfAndColumnMappingReads(isStreaming) } // A map from change version to associated commit timestamp. @@ -275,10 +277,35 @@ object CDCReader extends DeltaLogging { val changeFiles = ListBuffer[CDCDataSpec[AddCDCFile]]() val addFiles = ListBuffer[CDCDataSpec[AddFile]]() val removeFiles = ListBuffer[CDCDataSpec[RemoveFile]]() + + val startVersionSnapshot = deltaLog.getSnapshotAt(start) if (!isCDCEnabledOnTable(deltaLog.getSnapshotAt(start).metadata)) { throw DeltaErrors.changeDataNotRecordedException(start, start, end) } + /** + * TODO: Unblock this when we figure out the correct semantics. + * Currently batch CDC read on column mapping tables with Rename/Drop is blocked due to + * unclear semantics. + * Streaming CDF read is blocked on a separate code path in DeltaSource. + */ + val shouldCheckToBlockBatchReadOnColumnMappingTable = + !isStreaming && + snapshot.metadata.columnMappingMode != NoMapping && + !spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES) + + // Compare with start snapshot's metadata schema to fail fast + if (shouldCheckToBlockBatchReadOnColumnMappingTable && + !DeltaColumnMapping.isColumnMappingReadCompatible( + snapshot.metadata, startVersionSnapshot.metadata)) { + throw DeltaErrors.blockCdfAndColumnMappingReads( + isStreaming, + Some(snapshot.metadata.schema), + Some(startVersionSnapshot.metadata.schema) + ) + } + var totalBytes = 0L var totalFiles = 0L @@ -296,6 +323,19 @@ object CDCReader extends DeltaLogging { throw DeltaErrors.changeDataNotRecordedException(v, start, end) } + // Check all intermediary metadata schema changes as well + if (shouldCheckToBlockBatchReadOnColumnMappingTable) { + actions.collect { case a: Metadata => a }.foreach { metadata => + if (!DeltaColumnMapping.isColumnMappingReadCompatible(snapshot.metadata, metadata)) { + throw DeltaErrors.blockCdfAndColumnMappingReads( + isStreaming, + Some(snapshot.metadata.schema), + Some(metadata.schema) + ) + } + } + } + // Set up buffers for all action types to avoid multiple passes. val cdcActions = ListBuffer[AddCDCFile]() val addActions = ListBuffer[AddFile]() diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 05421420af9..337f49cde60 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -793,6 +793,17 @@ trait DeltaSQLConfBase { .createWithDefault(false) } + val DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES = + buildConf("changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled") + .doc( + "Reading change data in batch (e.g. using `table_changes()`) on Delta table with " + + "column mapping schema operations is currently blocked due to potential data loss and " + + "schema confusion. However, existing users may use this flag to force unblock " + + "if they'd like to take the risk.") + .internal() + .booleanConf + .createWithDefault(false) + val DYNAMIC_PARTITION_OVERWRITE_ENABLED = buildConf("dynamicPartitionOverwrite.enabled") .doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " + diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala index d4e5cad146a..4d44ada5762 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCStreamSuite.scala @@ -826,8 +826,8 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest val e = intercept[StreamingQueryException] { f }.getCause.getMessage - assert(e == "Change data feed (CDF) reads are currently not supported on tables " + - "with column mapping enabled.") + assert(e.contains("Change Data Feed (CDF) reads are not supported on tables with " + + "column mapping schema changes (e.g. rename or drop)")) } Seq(0, 1).foreach { startingVersion => diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 2d8fe11ddc0..9e15f1f7005 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -20,9 +20,12 @@ import java.io.File import java.text.SimpleDateFormat import java.util.Date +import scala.collection.JavaConverters._ + // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.commands.cdc.CDCReader._ import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.util.FileNames @@ -32,7 +35,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.functions.{col, current_timestamp, floor, lit} import org.apache.spark.sql.streaming.StreamingQueryException import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.{LongType, StructType} +import org.apache.spark.sql.types.{LongType, StringType, StructType} abstract class DeltaCDCSuiteBase extends QueryTest @@ -634,32 +637,6 @@ abstract class DeltaCDCSuiteBase } } - test("should block CDC reads when Column Mapping enabled - batch") { - withTable("t1") { - sql( - s""" - |CREATE TABLE t1 (id LONG) USING DELTA - |TBLPROPERTIES( - | '${DeltaConfigs.COLUMN_MAPPING_MODE.key}'='name', - | '${DeltaConfigs.CHANGE_DATA_FEED.key}'='true', - | '${DeltaConfigs.MIN_READER_VERSION.key}'='2', - | '${DeltaConfigs.MIN_WRITER_VERSION.key}'='5' - |) - |""".stripMargin) - spark.range(10).write.format("delta").mode("append").saveAsTable("t1") - - // case 1: batch read - spark.read.format("delta").table("t1").show() - - // case 2: batch CDC read - val e = intercept[DeltaUnsupportedOperationException] { - cdcRead(new TableName("t1"), StartingVersion("0"), EndingVersion("1")).show() - }.getMessage - assert(e == "Change data feed (CDF) reads are currently not supported on tables with " + - "column mapping enabled.") - } - } - test("batch write: append, dynamic partition overwrite + CDF") { withSQLConf( DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true", @@ -818,3 +795,184 @@ class DeltaCDCScalaSuite extends DeltaCDCSuiteBase { } +abstract class DeltaCDCColumnMappingSuiteBase extends DeltaCDCScalaSuite + with DeltaColumnMappingTestUtils { + + private def assertBlocked(f: => Unit): Unit = { + val e = intercept[DeltaUnsupportedOperationException] { + f + } + assert(e.getErrorClass == "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS" && + e.getMessage.contains( + DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key)) + } + + // Drop CDC fields because they are not useful for testing the blocking behavior + private def dropCDCFields(df: DataFrame): DataFrame = + df.drop(CDC_COMMIT_TIMESTAMP) + .drop(CDC_TYPE_COLUMN_NAME) + .drop(CDC_COMMIT_VERSION) + + import testImplicits._ + + /** + * Write test delta data to test blocking column mapping for CDC batch queries, it takes a + * sequence and write out as a row of strings, assuming the delta log's schema are all strings. + */ + private def writeDeltaData( + data: Seq[Int], + deltaLog: DeltaLog, + userSpecifiedSchema: Option[StructType] = None): Unit = { + val schema = userSpecifiedSchema.getOrElse(deltaLog.update().schema) + data.foreach { i => + val data = Seq(Row(schema.map(_ => i.toString): _*)) + spark.createDataFrame(data.asJava, schema) + .write.format("delta").mode("append").save(deltaLog.dataPath.toString) + } + } + + /** + * Set up initial table data, considering current column mapping mode + */ + protected def setupInitialDeltaTable(dir: File): Unit = { + require(columnMappingModeString != NoMapping.name) + val tablePath = dir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, tablePath) + + if (columnMappingModeString == NameMapping.name) { + // For name mode, we do an upgrade then write to test that behavior as well + // init table with 5 versions without column mapping + withColumnMappingConf("none") { + writeDeltaData((0 until 5), deltaLog, userSpecifiedSchema = Some( + new StructType().add("id", StringType, true).add("value", StringType, true) + )) + } + // upgrade to name mode + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` + |SET TBLPROPERTIES ( + | ${DeltaConfigs.COLUMN_MAPPING_MODE.key} = "name", + | ${DeltaConfigs.MIN_READER_VERSION.key} = "2", + | ${DeltaConfigs.MIN_WRITER_VERSION.key} = "5")""".stripMargin) + // write more data + writeDeltaData((5 until 10), deltaLog) + } + + checkAnswer( + dropCDCFields( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(deltaLog.update().version.toString))), + (0 until 10).map(_.toString).toDF("id").withColumn("value", col("id"))) + } + + test(s"blocking batch cdc read") { + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + val deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // add column should not be blocked + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` + |ADD COLUMN (name string) + |""".stripMargin) + + // write more data + writeDeltaData((10 until 15), deltaLog) + + checkAnswer( + dropCDCFields( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(deltaLog.update().version.toString))), + (0 until 10).map(_.toString).toDF("id") + .withColumn("value", col("id")) + .withColumn("name", lit(null)) union + (10 until 15).map(_.toString).toDF("id") + .withColumn("value", col("id")) + .withColumn("name", col("id"))) + } + + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + val deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // drop column would cause CDC read to be blocked + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` DROP COLUMN value + |""".stripMargin) + + assertBlocked { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(deltaLog.update().version.toString)).collect() + } + } + + withTempDir { dir => + // Set up an initial table with 10 records in schema + setupInitialDeltaTable(dir) + val deltaLog = DeltaLog.forTable(spark, dir.getCanonicalPath) + + // rename column would cause CDC read to be blocked + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id TO id2 + |""".stripMargin) + + assertBlocked { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(deltaLog.update().version.toString)).collect() + } + + // rename the column back + sql( + s""" + |ALTER TABLE delta.`${dir.getCanonicalPath}` RENAME COLUMN id2 TO id + |""".stripMargin) + + // Case 1 - would still block because we detected an intermediary action with a conflicting + // schema (the first rename). + assertBlocked { + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + EndingVersion(deltaLog.update().version.toString)).collect() + } + + // Case 2 - would NOT block if we exclude the second rename back, because the data schemas + // before that are now consistent with the latest. + checkAnswer( + dropCDCFields( + cdcRead( + new TablePath(dir.getCanonicalPath), + StartingVersion("0"), + // -2 to get rid of the last 2 schema change commits + EndingVersion((deltaLog.update().version - 2).toString))), + (0 until 10).map(_.toString).toDF("id").withColumn("value", col("id"))) + } + } +} + + +class DeltaCDCNameColumnMappingSuite extends DeltaCDCColumnMappingSuiteBase + with DeltaColumnMappingEnableNameMode with DeltaColumnMappingSelectedTestMixin { + + override def runOnlyTests: Seq[String] = Seq( + "changes from table by name", + "changes from table by path", + "batch write: append, dynamic partition overwrite + CDF", + "blocking batch cdc read" + ) + +} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala index b8acb41b9eb..f72ee384f8c 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaColumnMappingSuite.scala @@ -495,6 +495,106 @@ class DeltaColumnMappingSuite extends QueryTest assert(DeltaColumnMapping.findMaxColumnId(new StructType()) == 0) } + // TODO: repurpose this once we roll out the proper semantics for CM + streaming + testColumnMapping("isColumnMappingReadCompatible") { mode => + // Set up table based on mode and return the initial metadata actions for comparison + def setupInitialTable(deltaLog: DeltaLog): (MetadataAction, MetadataAction) = { + val tablePath = deltaLog.dataPath.toString + if (mode == NameMapping.name) { + Seq((1, "a"), (2, "b")).toDF("id", "name") + .write.mode("append").format("delta").save(tablePath) + // schema: + val m0 = deltaLog.update().metadata + + // add a column + sql(s"ALTER TABLE delta.`$tablePath` ADD COLUMN (score long)") + // schema: + val m1 = deltaLog.update().metadata + + // column mapping not enabled -> not blocked at all + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m1, m0)) + + // upgrade to name mode + alterTableWithProps(s"delta.`$tablePath`", Map( + DeltaConfigs.COLUMN_MAPPING_MODE.key -> "name", + DeltaConfigs.MIN_READER_VERSION.key -> "2", + DeltaConfigs.MIN_WRITER_VERSION.key -> "5")) + + (m0, m1) + } else { + // for id mode, just create the table + withSQLConf(DeltaConfigs.COLUMN_MAPPING_MODE.defaultTablePropertyKey -> "id") { + Seq((1, "a"), (2, "b")).toDF("id", "name") + .write.mode("append").format("delta").save(tablePath) + } + // schema: + val m0 = deltaLog.update().metadata + + // add a column + sql(s"ALTER TABLE delta.`$tablePath` ADD COLUMN (score long)") + // schema: + val m1 = deltaLog.update().metadata + + // add column shouldn't block + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m1, m0)) + + (m0, m1) + } + } + + withTempDir { dir => + val tablePath = dir.getCanonicalPath + val deltaLog = DeltaLog.forTable(spark, tablePath) + + val (m0, m1) = setupInitialTable(deltaLog) + + // schema: + val m2 = deltaLog.update().metadata + + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m2, m1)) + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m2, m0)) + + // rename column + sql(s"ALTER TABLE delta.`$tablePath` RENAME COLUMN score TO age") + // schema: + val m3 = deltaLog.update().metadata + + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m3, m2)) + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m3, m1)) + // But IS read compatible with the initial schema, because the added column should not + // be blocked by this column mapping check. + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m3, m0)) + + // drop a column + sql(s"ALTER TABLE delta.`$tablePath` DROP COLUMN age") + // schema: + val m4 = deltaLog.update().metadata + + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m4, m3)) + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m4, m2)) + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m4, m1)) + // but IS read compatible with the initial schema, because the added column is dropped + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m4, m0)) + + // add back the same column + sql(s"ALTER TABLE delta.`$tablePath` ADD COLUMN (score long)") + // schema: + val m5 = deltaLog.update().metadata + + // It IS read compatible with the previous schema, because the added column should not + // blocked by this column mapping check. + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m5, m4)) + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m5, m3)) + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m5, m2)) + // But Since the new added column has a different physical name as all previous columns, + // even it has the same logical name as say, m1.schema, we will still block + assert(!DeltaColumnMapping.isColumnMappingReadCompatible(m5, m1)) + // But it IS read compatible with the initial schema, because the added column should not + // be blocked by this column mapping check. + assert(DeltaColumnMapping.isColumnMappingReadCompatible(m5, m0)) + } + } + test("create table under id mode should be blocked") { withTable("t1") { val mode = "id" diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index 22647c60b7d..8d20e721417 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -2250,12 +2250,14 @@ trait DeltaErrorsSuiteBase } { val e = intercept[DeltaUnsupportedOperationException] { - throw DeltaErrors.blockCdfAndColumnMappingReads() + throw DeltaErrors.blockCdfAndColumnMappingReads(isStreaming = false) } assert(e.getErrorClass == "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS") assert(e.getSqlState == "0A000") - assert(e.getMessage == "Change data feed (CDF) reads are currently not supported on tables " + - "with column mapping enabled.") + assert(e.getMessage.contains("Change Data Feed (CDF) reads are not supported on tables with" + + " column mapping schema changes (e.g. rename or drop)")) + assert(e.getMessage.contains( + DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key)) } { val e = intercept[DeltaUnsupportedOperationException] {