Skip to content

Commit

Permalink
Enable batch CDF queries on column mapping enabled tables with fewer …
Browse files Browse the repository at this point in the history
…limitations

Resolves #1349

Due to the unclear behavior of streaming/CDC read on column mapping tables, we decide to temporarily block batch CDC read when user has performed rename or drop column operations after enabling column mapping.
Note that the following is not blocked:
1. CDC Read from a column mapping table without rename or drop column operations.
2. Upgrade to column mapping tables.
3. Existing compatible schema change operations such as ADD COLUMN.

New unit tests.

Resolves #1350

GitOrigin-RevId: 9b83b570623f42ecd614990f8caf4b6febbbc724
(cherry picked from commit 2041c3b)
  • Loading branch information
jackierwzhang authored and allisonport-db committed Aug 25, 2022
1 parent 4ff8109 commit 05c3932
Show file tree
Hide file tree
Showing 9 changed files with 406 additions and 39 deletions.
2 changes: 1 addition & 1 deletion core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
},
"DELTA_BLOCK_CDF_COLUMN_MAPPING_READS" : {
"message" : [
"Change data feed (CDF) reads are currently not supported on tables with column mapping enabled."
"Change Data Feed (CDF) reads are not supported on tables with column mapping schema changes (e.g. rename or drop). Read schema: <newSchema>. Incompatible schema: <oldSchema>. <hint>"
],
"sqlState" : "0A000"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.{Locale, UUID}
import scala.collection.mutable

import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils}

Expand Down Expand Up @@ -52,7 +53,11 @@ trait DeltaColumnMappingBase extends DeltaLogging {
*
* This list is case-insensitive.
*/
protected val DELTA_INTERNAL_COLUMNS: Set[String] = Set.empty
protected val DELTA_INTERNAL_COLUMNS: Set[String] =
(CDCReader.CDC_COLUMNS_IN_DATA ++ Seq(
CDCReader.CDC_COMMIT_VERSION,
CDCReader.CDC_COMMIT_TIMESTAMP)
).map(_.toLowerCase(Locale.ROOT)).toSet

val supportedModes: Set[DeltaColumnMappingMode] =
Set(NoMapping, NameMapping)
Expand Down Expand Up @@ -504,6 +509,44 @@ trait DeltaColumnMappingBase extends DeltaLogging {
newPhysicalToLogicalMap.get(physicalPath).exists(_.name != field.name)
}
}

/**
* Compare the old metadata's schema with new metadata's schema for column mapping schema changes.
*
* newMetadata's snapshot version must be >= oldMetadata's snapshot version so we could reliably
* detect the difference between ADD COLUMN and DROP COLUMN.
*
* As of now, `newMetadata` is column mapping read compatible with `oldMetadata` if
* no rename column or drop column has happened in-between.
*/
def isColumnMappingReadCompatible(newMetadata: Metadata, oldMetadata: Metadata): Boolean = {
val (oldMode, newMode) = (oldMetadata.columnMappingMode, newMetadata.columnMappingMode)
if (oldMode != NoMapping && newMode != NoMapping) {
// Both changes are post column mapping enabled
!isRenameColumnOperation(newMetadata, oldMetadata) &&
!isDropColumnOperation(newMetadata, oldMetadata)
} else if (oldMode == NoMapping && newMode != NoMapping) {
// The old metadata does not have column mapping while the new metadata does, in this case
// we assume an upgrade has happened in between.
// So we manually construct a post-upgrade schema for the old metadata and compare that with
// the new metadata, as the upgrade would use the logical name as the physical name, we could
// easily capture any difference in the schema using the same is{XXX}ColumnOperation utils.
var upgradedMetadata = assignColumnIdAndPhysicalName(
oldMetadata, oldMetadata, isChangingModeOnExistingTable = true
)
// need to change to a column mapping mode too so the utils below can recognize
upgradedMetadata = upgradedMetadata.copy(
configuration = upgradedMetadata.configuration ++
Map(DeltaConfigs.COLUMN_MAPPING_MODE.key -> newMetadata.columnMappingMode.name)
)
// use the same check
!isRenameColumnOperation(newMetadata, upgradedMetadata) &&
!isDropColumnOperation(newMetadata, upgradedMetadata)
} else {
// Not column mapping, don't block
true
}
}
}

object DeltaColumnMapping extends DeltaColumnMappingBase
Expand Down
17 changes: 15 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2265,9 +2265,22 @@ trait DeltaErrorsBase
// scalastyle:on line.size.limit
}

def blockCdfAndColumnMappingReads(): Throwable = {

val columnMappingCDFBatchBlockHint: String =
s"You may force enable batch CDF read at your own risk by turning on " +
s"${DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES.key}."

def blockCdfAndColumnMappingReads(
isStreaming: Boolean,
readSchema: Option[StructType] = None,
incompatibleSchema: Option[StructType] = None): Throwable = {
new DeltaUnsupportedOperationException(
errorClass = "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS"
errorClass = "DELTA_BLOCK_CDF_COLUMN_MAPPING_READS",
messageParameters = Array(
readSchema.map(_.json).getOrElse(""),
incompatibleSchema.map(_.json).getOrElse(""),
if (isStreaming) "" else columnMappingCDFBatchBlockHint
)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.sql.Timestamp

import scala.collection.mutable.ListBuffer

import org.apache.spark.sql.delta.{DeltaConfigs, DeltaErrors, DeltaHistoryManager, DeltaLog, DeltaOperations, DeltaParquetFileFormat, DeltaTableUtils, DeltaTimeTravelSpec, NoMapping, Snapshot}
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{Action, AddCDCFile, AddFile, CommitInfo, FileAction, Metadata, RemoveFile}
import org.apache.spark.sql.delta.files.{CdcAddFileIndex, TahoeChangeFileIndex, TahoeFileIndex, TahoeRemoveFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -264,8 +264,10 @@ object CDCReader extends DeltaLogging {

// If the table has column mapping enabled, throw an error. With column mapping, certain schema
// changes are possible (rename a column or drop a column) which don't work well with CDF.
if (snapshot.metadata.columnMappingMode != NoMapping) {
throw DeltaErrors.blockCdfAndColumnMappingReads()
// TODO: remove this after the proper blocking semantics is rolled out
// This is only blocking streaming CDF, batch CDF will be blocked differently below.
if (isStreaming && snapshot.metadata.columnMappingMode != NoMapping) {
throw DeltaErrors.blockCdfAndColumnMappingReads(isStreaming)
}

// A map from change version to associated commit timestamp.
Expand All @@ -275,10 +277,35 @@ object CDCReader extends DeltaLogging {
val changeFiles = ListBuffer[CDCDataSpec[AddCDCFile]]()
val addFiles = ListBuffer[CDCDataSpec[AddFile]]()
val removeFiles = ListBuffer[CDCDataSpec[RemoveFile]]()

val startVersionSnapshot = deltaLog.getSnapshotAt(start)
if (!isCDCEnabledOnTable(deltaLog.getSnapshotAt(start).metadata)) {
throw DeltaErrors.changeDataNotRecordedException(start, start, end)
}

/**
* TODO: Unblock this when we figure out the correct semantics.
* Currently batch CDC read on column mapping tables with Rename/Drop is blocked due to
* unclear semantics.
* Streaming CDF read is blocked on a separate code path in DeltaSource.
*/
val shouldCheckToBlockBatchReadOnColumnMappingTable =
!isStreaming &&
snapshot.metadata.columnMappingMode != NoMapping &&
!spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES)

// Compare with start snapshot's metadata schema to fail fast
if (shouldCheckToBlockBatchReadOnColumnMappingTable &&
!DeltaColumnMapping.isColumnMappingReadCompatible(
snapshot.metadata, startVersionSnapshot.metadata)) {
throw DeltaErrors.blockCdfAndColumnMappingReads(
isStreaming,
Some(snapshot.metadata.schema),
Some(startVersionSnapshot.metadata.schema)
)
}

var totalBytes = 0L
var totalFiles = 0L

Expand All @@ -296,6 +323,19 @@ object CDCReader extends DeltaLogging {
throw DeltaErrors.changeDataNotRecordedException(v, start, end)
}

// Check all intermediary metadata schema changes as well
if (shouldCheckToBlockBatchReadOnColumnMappingTable) {
actions.collect { case a: Metadata => a }.foreach { metadata =>
if (!DeltaColumnMapping.isColumnMappingReadCompatible(snapshot.metadata, metadata)) {
throw DeltaErrors.blockCdfAndColumnMappingReads(
isStreaming,
Some(snapshot.metadata.schema),
Some(metadata.schema)
)
}
}
}

// Set up buffers for all action types to avoid multiple passes.
val cdcActions = ListBuffer[AddCDCFile]()
val addActions = ListBuffer[AddFile]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,17 @@ trait DeltaSQLConfBase {
.createWithDefault(false)
}

val DELTA_CDF_UNSAFE_BATCH_READ_ON_INCOMPATIBLE_SCHEMA_CHANGES =
buildConf("changeDataFeed.unsafeBatchReadOnIncompatibleSchemaChanges.enabled")
.doc(
"Reading change data in batch (e.g. using `table_changes()`) on Delta table with " +
"column mapping schema operations is currently blocked due to potential data loss and " +
"schema confusion. However, existing users may use this flag to force unblock " +
"if they'd like to take the risk.")
.internal()
.booleanConf
.createWithDefault(false)

val DYNAMIC_PARTITION_OVERWRITE_ENABLED =
buildConf("dynamicPartitionOverwrite.enabled")
.doc("Whether to overwrite partitions dynamically when 'partitionOverwriteMode' is set to " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ trait DeltaCDCStreamSuiteBase extends StreamTest with DeltaSQLCommandTest
val e = intercept[StreamingQueryException] {
f
}.getCause.getMessage
assert(e == "Change data feed (CDF) reads are currently not supported on tables " +
"with column mapping enabled.")
assert(e.contains("Change Data Feed (CDF) reads are not supported on tables with " +
"column mapping schema changes (e.g. rename or drop)"))
}

Seq(0, 1).foreach { startingVersion =>
Expand Down
Loading

0 comments on commit 05c3932

Please sign in to comment.