From a455845c6dc0bb824e0630f0cbafd8cc52c7dc14 Mon Sep 17 00:00:00 2001 From: Dhruv Arya Date: Mon, 23 Sep 2024 16:00:06 -0700 Subject: [PATCH] add support for ict in cdc --- .../spark/sql/delta/DeltaHistoryManager.scala | 10 +- .../sql/delta/commands/cdc/CDCReader.scala | 38 ++++-- .../spark/sql/delta/sources/DeltaSource.scala | 14 ++- .../delta/sources/DeltaSourceCDCSupport.scala | 92 +++++++++------ .../spark/sql/delta/CheckCDCAnswer.scala | 14 +-- .../spark/sql/delta/DeltaCDCSuite.scala | 26 ++++- .../sql/delta/DeltaTimeTravelSuite.scala | 2 +- .../sql/delta/InCommitTimestampSuite.scala | 110 +++++++++++++++++- 8 files changed, 241 insertions(+), 65 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index e9a31e0a05..0a8d4e4372 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -191,7 +191,7 @@ class DeltaHistoryManager( if (end - start > 2 * maxKeysPerList) { parallelSearch(time, start, end) } else { - val commits = getCommits( + val commits = getCommitsWithNonICTTimestamps( deltaLog.store, deltaLog.logPath, start, @@ -599,9 +599,11 @@ object DeltaHistoryManager extends DeltaLogging { * Returns the commit version and timestamps of all commits in `[start, end)`. If `end` is not * specified, will return all commits that exist after `start`. Will guarantee that the commits * returned will have both monotonically increasing versions as well as timestamps. - * Exposed for tests. + * Note that this function will return non-ICT timestamps even for commits where + * InCommitTimestamps are enabled. The caller is responsible for ensuring that the appropriate + * timestamps are used. */ - private[delta] def getCommits( + private[delta] def getCommitsWithNonICTTimestamps( logStore: LogStore, logPath: Path, start: Long, @@ -688,7 +690,7 @@ object DeltaHistoryManager extends DeltaLogging { val logStore = LogStore(SparkEnv.get.conf, conf.value) val basePath = new Path(logPath) startVersions.map { startVersion => - val commits = getCommits( + val commits = getCommitsWithNonICTTimestamps( logStore, basePath, startVersion, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index ae2d9a61f5..29f5411e06 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -469,6 +469,8 @@ trait CDCReaderImpl extends DeltaLogging { * @param start - startingVersion of the changes * @param end - endingVersion of the changes * @param changes - changes is an iterator of all FileActions for a particular commit version. + * Note that for log files where InCommitTimestamps are enabled, the iterator + * must also contain the [[CommitInfo]] action. * @param spark - SparkSession * @param isStreaming - indicates whether the DataFrame returned is a streaming DataFrame * @param useCoarseGrainedCDC - ignores checks related to CDC being disabled in any of the @@ -493,9 +495,11 @@ trait CDCReaderImpl extends DeltaLogging { throw DeltaErrors.endBeforeStartVersionInCDC(start, end) } - // A map from change version to associated commit timestamp. - val timestampsByVersion: Map[Long, Timestamp] = - getTimestampsByVersion(deltaLog, start, end, spark) + // A map from change version to associated file modification timestamps. + // We only need these for non-InCommitTimestamp commits because for InCommitTimestamp commits, + // the timestamps are already stored in the commit info. + val nonICTTimestampsByVersion: Map[Long, Timestamp] = + getNonICTTimestampsByVersion(deltaLog, start, end) val changeFiles = ListBuffer[CDCDataSpec[AddCDCFile]]() val addFiles = ListBuffer[CDCDataSpec[AddFile]]() @@ -615,7 +619,6 @@ trait CDCReaderImpl extends DeltaLogging { // Set up buffers for all action types to avoid multiple passes. val cdcActions = ListBuffer[AddCDCFile]() - val ts = timestampsByVersion.get(v).orNull // Note that the CommitInfo is *not* guaranteed to be generated in 100% of cases. // We are using it only for a hotfix-safe mitigation/defense-in-depth - the value @@ -635,6 +638,18 @@ trait CDCReaderImpl extends DeltaLogging { case i: CommitInfo => commitInfo = Some(i) case _ => // do nothing } + // If the commit has an In-Commit Timestamp, we should use that as the commit timestamp. + // Note that it is technically possible for a commit range to begin with ICT commits + // followed by non-ICT commits, and end with ICT commits again. Ideally, for these commits + // we should use the file modification time for the first two ranges. However, this + // scenario is an edge case not worth optimizing for. + val ts = commitInfo + .flatMap(_.inCommitTimestamp) + .map{ ict => new Timestamp(ict) } + .getOrElse(nonICTTimestampsByVersion.get(v).orNull) + // When `isStreaming` = `true` the [CommitInfo] action is only used for passing the + // in-commit timestamp to this method. We should filter them out. + commitInfo = if (isStreaming) None else commitInfo // If there are CDC actions, we read them exclusively if we should not use the // Add and RemoveFiles. @@ -887,22 +902,25 @@ trait CDCReaderImpl extends DeltaLogging { } /** - * Builds a map from commit versions to associated commit timestamps. + * Builds a map from commit versions to associated commit timestamps where the timestamp + * is the modification time of the commit file. Note that this function will not return + * InCommitTimestamps, it is up to the consumer of this function to decide whether the + * file modification time is the correct commit timestamp or whether they need to read the ICT. + * * @param start start commit version - * @param end end commit version + * @param end end commit version (inclusive) */ - def getTimestampsByVersion( + def getNonICTTimestampsByVersion( deltaLog: DeltaLog, start: Long, - end: Long, - spark: SparkSession): Map[Long, Timestamp] = { + end: Long): Map[Long, Timestamp] = { // Correct timestamp values are only available through DeltaHistoryManager.getCommits(). Commit // info timestamps are wrong, and file modification times are wrong because they need to be // monotonized first. This just performs a list (we don't read the contents of the files in // getCommits()) so the performance overhead is minimal. val monotonizationStart = math.max(start - DeltaHistoryManager.POTENTIALLY_UNMONOTONIZED_TIMESTAMPS, 0) - val commits = DeltaHistoryManager.getCommits( + val commits = DeltaHistoryManager.getCommitsWithNonICTTimestamps( deltaLog.store, deltaLog.logPath, monotonizationStart, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index e5fcb45932..939ee70877 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.storage.{ClosableIterator, SupportsRewinding} import org.apache.spark.sql.delta.storage.ClosableIterator._ import org.apache.spark.sql.delta.util.{DateTimeUtils, TimestampFormatter} +import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.hadoop.fs.FileStatus import org.apache.spark.internal.MDC @@ -792,7 +793,7 @@ case class DeltaSource( val (result, duration) = Utils.timeTakenMs { var iter = if (isInitialSnapshot) { Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy - case 1 => getSnapshotAt(fromVersion).toClosable + case 1 => getSnapshotAt(fromVersion)._1.toClosable case 2 => filterAndIndexDeltaLogs(fromVersion + 1) } } else { @@ -845,8 +846,10 @@ case class DeltaSource( /** * This method computes the initial snapshot to read when Delta Source was initialized on a fresh * stream. + * @return A tuple where the first element is an iterator of IndexedFiles and the second element + * is the in-commit timestamp of the initial snapshot if available. */ - protected def getSnapshotAt(version: Long): Iterator[IndexedFile] = { + protected def getSnapshotAt(version: Long): (Iterator[IndexedFile], Option[Long]) = { if (initialState == null || version != initialStateVersion) { super[DeltaSourceBase].cleanUpSnapshotResources() val snapshot = getSnapshotFromDeltaLog(version) @@ -879,7 +882,12 @@ case class DeltaSource( ) } } - addBeginAndEndIndexOffsetsForVersion(version, initialState.iterator()) + val inCommitTimestampOpt = + Option.when( + DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(initialState.snapshot.metadata)) { + initialState.snapshot.timestamp + } + (addBeginAndEndIndexOffsetsForVersion(version, initialState.iterator()), inCommitTimestampOpt) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala index 0e66cf83d9..fcfbf2d5a7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSourceCDCSupport.scala @@ -200,9 +200,9 @@ trait DeltaSourceCDCSupport { self: DeltaSource => val changes = getFileChangesForCDC( startVersion, startIndex, isInitialSnapshot, limits = None, Some(endOffset)) - val groupedFileActions = - changes.map { case (v, indexFiles) => - (v, indexFiles.filter(_.hasFileAction).map(_.getFileAction).toSeq) + val groupedFileAndCommitInfoActions = + changes.map { case (v, indexFiles, commitInfoOpt) => + (v, indexFiles.filter(_.hasFileAction).map(_.getFileAction).toSeq ++ commitInfoOpt) } val (result, duration) = Utils.timeTakenMs { @@ -211,7 +211,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource => readSnapshotDescriptor, startVersion, endOffset.reservoirVersion, - groupedFileActions, + groupedFileAndCommitInfoActions, spark, isStreaming = true) .fileChangeDf @@ -228,7 +228,8 @@ trait DeltaSourceCDCSupport { self: DeltaSource => /** * Get the changes starting from (fromVersion, fromIndex). fromVersion is included. - * It returns an iterator of (log_version, fileActions) + * It returns an iterator of (log_version, fileActions, Optional[CommitInfo]). The commit info + * is needed later on so that the InCommitTimestamp of the log files can be determined. * * If verifyMetadataAction = true, we will break the stream when we detect any read-incompatible * metadata changes. @@ -239,10 +240,12 @@ trait DeltaSourceCDCSupport { self: DeltaSource => isInitialSnapshot: Boolean, limits: Option[AdmissionLimits], endOffset: Option[DeltaSourceOffset], - verifyMetadataAction: Boolean = true): Iterator[(Long, Iterator[IndexedFile])] = { + verifyMetadataAction: Boolean = true + ): Iterator[(Long, Iterator[IndexedFile], Option[CommitInfo])] = { /** Returns matching files that were added on or after startVersion among delta logs. */ - def filterAndIndexDeltaLogs(startVersion: Long): Iterator[(Long, IndexedChangeFileSeq)] = { + def filterAndIndexDeltaLogs( + startVersion: Long): Iterator[(Long, IndexedChangeFileSeq, Option[CommitInfo])] = { // TODO: handle the case when failOnDataLoss = false and we are missing change log files // in that case, we need to recompute the start snapshot and evolve the schema if needed require(options.failOnDataLoss || !trackingMetadataChange, @@ -250,7 +253,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource => deltaLog.getChanges(startVersion, options.failOnDataLoss).map { case (version, actions) => // skipIndexedFile must be applied after creating IndexedFile so that // IndexedFile.index is consistent across all versions. - val (fileActions, skipIndexedFile, metadataOpt, protocolOpt) = + val (fileActions, skipIndexedFile, metadataOpt, protocolOpt, commitInfoOpt) = filterCDCActions( actions, version, fromVersion, endOffset.map(_.reservoirVersion), verifyMetadataAction && !trackingMetadataChange) @@ -278,7 +281,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource => remove = remove, shouldSkip = skipIndexedFile) }) - (version, new IndexedChangeFileSeq(itr, isInitialSnapshot = false)) + (version, new IndexedChangeFileSeq(itr, isInitialSnapshot = false), commitInfoOpt) } } @@ -293,35 +296,43 @@ trait DeltaSourceCDCSupport { self: DeltaSource => } val (result, duration) = Utils.timeTakenMs { - val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) { - // If we are reading change data from the start of the table we need to - // get the latest snapshot of the table as well. - val snapshot: Iterator[IndexedFile] = getSnapshotAt(fromVersion).map { m => - // When we get the snapshot the dataChange is false for the AddFile actions - // We need to set it to true for it to be considered by the CDCReader. - if (m.add != null) { - m.copy(add = m.add.copy(dataChange = true)) - } else { - m + val iter: Iterator[(Long, IndexedChangeFileSeq, Option[CommitInfo])] = + if (isInitialSnapshot) { + // If we are reading change data from the start of the table we need to + // get the latest snapshot of the table as well. + val (unprocessedSnapshot, snapshotInCommitTimestampOpt) = getSnapshotAt(fromVersion) + val snapshot: Iterator[IndexedFile] = unprocessedSnapshot.map { m => + // When we get the snapshot the dataChange is false for the AddFile actions + // We need to set it to true for it to be considered by the CDCReader. + if (m.add != null) { + m.copy(add = m.add.copy(dataChange = true)) + } else { + m + } } + // This is a hack so that we can easily access the ICT later on. + // This `CommitInfo` action is not useful for anything else and should be filtered + // out later on. + val ictOnlyCommitInfo = Some(CommitInfo.empty(Some(-1)) + .copy(inCommitTimestamp = snapshotInCommitTimestampOpt)) + val snapshotItr: Iterator[(Long, IndexedChangeFileSeq, Option[CommitInfo])] = Iterator(( + fromVersion, + new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true), + ictOnlyCommitInfo + )) + + snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1) + } else { + filterAndIndexDeltaLogs(fromVersion) } - val snapshotItr: Iterator[(Long, IndexedChangeFileSeq)] = Iterator(( - fromVersion, - new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true) - )) - - snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1) - } else { - filterAndIndexDeltaLogs(fromVersion) - } // In this case, filterFiles will consume the available capacity. We use takeWhile // to stop the iteration when we reach the limit or if endOffset is specified and the // endVersion is reached which will save us from reading unnecessary log files. - iter.takeWhile { case (version, _) => + iter.takeWhile { case (version, _, _) => limits.forall(_.hasCapacity) && versionLessThanEndOffset(version, endOffset) - }.map { case (version, indexItr) => - (version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset)) + }.map { case (version, indexItr, ci) => + (version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset), ci) } } @@ -351,10 +362,11 @@ trait DeltaSourceCDCSupport { self: DeltaSource => batchStartVersion: Long, batchEndVersionOpt: Option[Long] = None, verifyMetadataAction: Boolean = true - ): (Seq[FileAction], Boolean, Option[Metadata], Option[Protocol]) = { + ): (Seq[FileAction], Boolean, Option[Metadata], Option[Protocol], Option[CommitInfo]) = { var shouldSkipIndexedFile = false var metadataAction: Option[Metadata] = None var protocolAction: Option[Protocol] = None + var commitInfoAction: Option[CommitInfo] = None def checkAndCacheMetadata(m: Metadata): Unit = { if (verifyMetadataAction) { checkReadIncompatibleSchemaChanges(m, version, batchStartVersion, batchEndVersionOpt) @@ -367,6 +379,9 @@ trait DeltaSourceCDCSupport { self: DeltaSource => if (actions.exists(_.isInstanceOf[AddCDCFile])) { (actions.filter { case _: AddCDCFile => true + case commitInfo: CommitInfo => + commitInfoAction = Some(commitInfo) + false case m: Metadata => checkAndCacheMetadata(m) false @@ -374,7 +389,11 @@ trait DeltaSourceCDCSupport { self: DeltaSource => protocolAction = Some(p) false case _ => false - }.asInstanceOf[Seq[FileAction]], shouldSkipIndexedFile, metadataAction, protocolAction) + }.asInstanceOf[Seq[FileAction]], + shouldSkipIndexedFile, + metadataAction, + protocolAction, + commitInfoAction) } else { (actions.filter { case a: AddFile => @@ -392,12 +411,17 @@ trait DeltaSourceCDCSupport { self: DeltaSource => false case commitInfo: CommitInfo => shouldSkipIndexedFile = CDCReader.shouldSkipFileActionsInCommit(commitInfo) + commitInfoAction = Some(commitInfo) false case _: AddCDCFile | _: SetTransaction | _: DomainMetadata => false case null => // Some crazy future feature. Ignore false - }.asInstanceOf[Seq[FileAction]], shouldSkipIndexedFile, metadataAction, protocolAction) + }.asInstanceOf[Seq[FileAction]], + shouldSkipIndexedFile, + metadataAction, + protocolAction, + commitInfoAction) } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/CheckCDCAnswer.scala b/spark/src/test/scala/org/apache/spark/sql/delta/CheckCDCAnswer.scala index 77e8730d52..5431c42e8d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/CheckCDCAnswer.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/CheckCDCAnswer.scala @@ -47,16 +47,10 @@ trait CheckCDCAnswer extends QueryTest { // Results should match the fully monotonized commits. Note that this map will include // all versions of the table but only the ones in timestampsByVersion are checked for // correctness. - val commits = DeltaHistoryManager.getCommits( - log.store, - log.logPath, - start = 0, - end = None, - log.newDeltaHadoopConf()) - - // Note that the timestamps come from filesystem modification timestamps, so they're - // milliseconds since epoch and we don't need to deal with timezones. - commits.map(f => (f.version -> new Timestamp(f.timestamp))).toMap + val commits = log.history.getHistory(start = 0, end = None) + // Note that the timestamps are in milliseconds since epoch and we don't need to deal + // with timezones. + commits.map(f => (f.getVersion -> f.timestamp)).toMap } timestampsByVersion.keySet.foreach { version => diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala index 29a95c51b9..710174f5d1 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaCDCSuite.scala @@ -25,11 +25,13 @@ import scala.collection.JavaConverters._ // scalastyle:off import.ordering.noEmptyLine import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN import org.apache.spark.sql.delta.commands.cdc.CDCReader._ +import org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaColumnMappingSelectedTestMixin import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.FileNames +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames} +import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest, Row} @@ -1103,3 +1105,25 @@ class DeltaCDCScalaWithDeletionVectorsSuite extends DeltaCDCScalaSuite enableDeletionVectorsForAllSupportedOperations(spark) } } + +class DeltaCDCScalaSuiteWithCoordinatedCommitsBatch10 extends DeltaCDCScalaSuite + with CoordinatedCommitsBaseSuite { + + /** Modify timestamp for a delta commit, used to test timestamp querying */ + override def modifyDeltaTimestamp(deltaLog: DeltaLog, version: Long, time: Long): Unit = { + val fileProvider = DeltaCommitFileProvider(deltaLog.snapshot) + val file = new File(fileProvider.deltaFile(version).toUri) + InCommitTimestampTestUtils.overwriteICTInDeltaFile( + deltaLog, + new Path(file.getPath), + Some(time)) + file.setLastModified(time) + val crc = new File(FileNames.checksumFile(deltaLog.logPath, version).toUri) + if (crc.exists()) { + InCommitTimestampTestUtils.overwriteICTInCrc(deltaLog, version, Some(time)) + crc.setLastModified(time) + } + } + + override def coordinatedCommitsBackfillBatchSize: Option[Int] = Some(10) +} diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala index 3ab857369b..f0d8a02049 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala @@ -140,7 +140,7 @@ class DeltaTimeTravelSuite extends QueryTest start - 2.seconds, // adjusts to start + 4 ms start + 10.seconds) - val commits = DeltaHistoryManager.getCommits( + val commits = DeltaHistoryManager.getCommitsWithNonICTTimestamps( deltaLog.store, deltaLog.logPath, 0, diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index 14e80aeea0..73b745a13d 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -20,7 +20,7 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.sql.Timestamp import scala.collection.JavaConverters._ -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import com.databricks.spark.util.{Log4jUsageLogger, UsageRecord} import org.apache.spark.sql.delta.DeltaOperations.ManualUpdate @@ -34,6 +34,8 @@ import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, Json import org.apache.hadoop.fs.Path import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.streaming.StreamTest import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.util.{ManualClock, SerializableConfiguration, ThreadUtils} @@ -42,7 +44,8 @@ class InCommitTimestampSuite with SharedSparkSession with DeltaSQLCommandTest with DeltaTestUtilsBase - with CoordinatedCommitsTestUtils { + with CoordinatedCommitsTestUtils + with StreamTest { override def beforeAll(): Unit = { super.beforeAll() @@ -58,6 +61,17 @@ class InCommitTimestampSuite commitInfo.get.inCommitTimestamp.get } + private def getFileModificationTimesMap( + deltaLog: DeltaLog, start: Long, end: Long): Map[Long, Long] = { + deltaLog.store.listFrom( + FileNames.listingPrefix(deltaLog.logPath, start), deltaLog.newDeltaHadoopConf()) + .collect { + case FileNames.DeltaFile(fs, v) => + v -> fs.getModificationTime + }.takeWhile(_._1 <= end) + .toMap + } + test("Enable ICT on commit 0") { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) @@ -990,6 +1004,98 @@ class InCommitTimestampSuite assert(historySubset.head.timestamp.getTime == getInCommitTimestamp(deltaLog, 2)) } } + + for (ictEnablementVersion <- Seq(1, 4, 7)) + testWithDefaultCommitCoordinatorUnset(s"CDC read with all commits being ICT " + + s"[ictEnablementVersion = $ictEnablementVersion]") { + withSQLConf( + DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true", + DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey -> "false" + ) { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + for (i <- 0 to 7) { + if (i == ictEnablementVersion) { + spark.sql( + s"ALTER TABLE delta.`$path` " + + s"SET TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true')") + } else { + spark.range(i, i + 1).write.format("delta").mode("append").save(path) + } + } + val deltaLog = DeltaLog.forTable(spark, new Path(path)) + val result = spark.read + .format("delta") + .option("startingVersion", "1") + .option("endingVersion", "7") + .option("readChangeFeed", "true") + .load(path) + .select("_commit_timestamp", "_commit_version") + .collect() + val fileTimestampsMap = getFileModificationTimesMap(deltaLog, 0, 7) + result.foreach { row => + val v = row.getAs[Long]("_commit_version") + val expectedTimestamp = if (v >= ictEnablementVersion) { + getInCommitTimestamp(deltaLog, v) + } else { + fileTimestampsMap(v) + } + assert(row.getAs[Timestamp]("_commit_timestamp").getTime == expectedTimestamp) + } + } + } + } + + for (ictEnablementVersion <- Seq(1, 4, 7)) + testWithDefaultCommitCoordinatorUnset(s"Streaming query + CDC " + + s"[ictEnablementVersion = $ictEnablementVersion]") { + withSQLConf( + DeltaConfigs.CHANGE_DATA_FEED.defaultTablePropertyKey -> "true", + DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey -> "false" + ) { + withTempDir { tempDir => withTempDir { checkpointDir => withTempDir { streamingSink => + val path = tempDir.getCanonicalPath + spark.range(0).write.format("delta").mode("append").save(path) + + val sourceDeltaLog = DeltaLog.forTable(spark, new Path(path)) + val sinkPath = streamingSink.getCanonicalPath + val streamingQuery = spark.readStream + .format("delta") + .option("readChangeFeed", "true") + .load(path) + .select( + col("_commit_timestamp").alias("source_commit_timestamp"), + col("_commit_version").alias("source_commit_version")) + .writeStream + .format("delta") + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .start(sinkPath) + for (i <- 1 to 7) { + if (i == ictEnablementVersion) { + spark.sql(s"ALTER TABLE delta.`$path` " + + s"SET TBLPROPERTIES ('${DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key}' = 'true')") + } else { + spark.range(i, i + 1).write.format("delta").mode("append").save(path) + } + } + streamingQuery.processAllAvailable() + val fileTimestampsMap = getFileModificationTimesMap(sourceDeltaLog, 0, 7) + val result = spark.read.format("delta") + .load(sinkPath) + .collect() + result.foreach { row => + val v = row.getAs[Long]("source_commit_version") + val expectedTimestamp = if (v >= ictEnablementVersion) { + getInCommitTimestamp(sourceDeltaLog, v) + } else { + fileTimestampsMap(v) + } + assert( + row.getAs[Timestamp]("source_commit_timestamp").getTime == expectedTimestamp) + } + }}} + } + } } class InCommitTimestampWithCoordinatedCommitsSuite