Skip to content

Commit

Permalink
add support for ict in cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed Sep 23, 2024
1 parent 437db30 commit a455845
Show file tree
Hide file tree
Showing 8 changed files with 241 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class DeltaHistoryManager(
if (end - start > 2 * maxKeysPerList) {
parallelSearch(time, start, end)
} else {
val commits = getCommits(
val commits = getCommitsWithNonICTTimestamps(
deltaLog.store,
deltaLog.logPath,
start,
Expand Down Expand Up @@ -599,9 +599,11 @@ object DeltaHistoryManager extends DeltaLogging {
* Returns the commit version and timestamps of all commits in `[start, end)`. If `end` is not
* specified, will return all commits that exist after `start`. Will guarantee that the commits
* returned will have both monotonically increasing versions as well as timestamps.
* Exposed for tests.
* Note that this function will return non-ICT timestamps even for commits where
* InCommitTimestamps are enabled. The caller is responsible for ensuring that the appropriate
* timestamps are used.
*/
private[delta] def getCommits(
private[delta] def getCommitsWithNonICTTimestamps(
logStore: LogStore,
logPath: Path,
start: Long,
Expand Down Expand Up @@ -688,7 +690,7 @@ object DeltaHistoryManager extends DeltaLogging {
val logStore = LogStore(SparkEnv.get.conf, conf.value)
val basePath = new Path(logPath)
startVersions.map { startVersion =>
val commits = getCommits(
val commits = getCommitsWithNonICTTimestamps(
logStore,
basePath,
startVersion,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,8 @@ trait CDCReaderImpl extends DeltaLogging {
* @param start - startingVersion of the changes
* @param end - endingVersion of the changes
* @param changes - changes is an iterator of all FileActions for a particular commit version.
* Note that for log files where InCommitTimestamps are enabled, the iterator
* must also contain the [[CommitInfo]] action.
* @param spark - SparkSession
* @param isStreaming - indicates whether the DataFrame returned is a streaming DataFrame
* @param useCoarseGrainedCDC - ignores checks related to CDC being disabled in any of the
Expand All @@ -493,9 +495,11 @@ trait CDCReaderImpl extends DeltaLogging {
throw DeltaErrors.endBeforeStartVersionInCDC(start, end)
}

// A map from change version to associated commit timestamp.
val timestampsByVersion: Map[Long, Timestamp] =
getTimestampsByVersion(deltaLog, start, end, spark)
// A map from change version to associated file modification timestamps.
// We only need these for non-InCommitTimestamp commits because for InCommitTimestamp commits,
// the timestamps are already stored in the commit info.
val nonICTTimestampsByVersion: Map[Long, Timestamp] =
getNonICTTimestampsByVersion(deltaLog, start, end)

val changeFiles = ListBuffer[CDCDataSpec[AddCDCFile]]()
val addFiles = ListBuffer[CDCDataSpec[AddFile]]()
Expand Down Expand Up @@ -615,7 +619,6 @@ trait CDCReaderImpl extends DeltaLogging {

// Set up buffers for all action types to avoid multiple passes.
val cdcActions = ListBuffer[AddCDCFile]()
val ts = timestampsByVersion.get(v).orNull

// Note that the CommitInfo is *not* guaranteed to be generated in 100% of cases.
// We are using it only for a hotfix-safe mitigation/defense-in-depth - the value
Expand All @@ -635,6 +638,18 @@ trait CDCReaderImpl extends DeltaLogging {
case i: CommitInfo => commitInfo = Some(i)
case _ => // do nothing
}
// If the commit has an In-Commit Timestamp, we should use that as the commit timestamp.
// Note that it is technically possible for a commit range to begin with ICT commits
// followed by non-ICT commits, and end with ICT commits again. Ideally, for these commits
// we should use the file modification time for the first two ranges. However, this
// scenario is an edge case not worth optimizing for.
val ts = commitInfo
.flatMap(_.inCommitTimestamp)
.map{ ict => new Timestamp(ict) }
.getOrElse(nonICTTimestampsByVersion.get(v).orNull)
// When `isStreaming` = `true` the [CommitInfo] action is only used for passing the
// in-commit timestamp to this method. We should filter them out.
commitInfo = if (isStreaming) None else commitInfo

// If there are CDC actions, we read them exclusively if we should not use the
// Add and RemoveFiles.
Expand Down Expand Up @@ -887,22 +902,25 @@ trait CDCReaderImpl extends DeltaLogging {
}

/**
* Builds a map from commit versions to associated commit timestamps.
* Builds a map from commit versions to associated commit timestamps where the timestamp
* is the modification time of the commit file. Note that this function will not return
* InCommitTimestamps, it is up to the consumer of this function to decide whether the
* file modification time is the correct commit timestamp or whether they need to read the ICT.
*
* @param start start commit version
* @param end end commit version
* @param end end commit version (inclusive)
*/
def getTimestampsByVersion(
def getNonICTTimestampsByVersion(
deltaLog: DeltaLog,
start: Long,
end: Long,
spark: SparkSession): Map[Long, Timestamp] = {
end: Long): Map[Long, Timestamp] = {
// Correct timestamp values are only available through DeltaHistoryManager.getCommits(). Commit
// info timestamps are wrong, and file modification times are wrong because they need to be
// monotonized first. This just performs a list (we don't read the contents of the files in
// getCommits()) so the performance overhead is minimal.
val monotonizationStart =
math.max(start - DeltaHistoryManager.POTENTIALLY_UNMONOTONIZED_TIMESTAMPS, 0)
val commits = DeltaHistoryManager.getCommits(
val commits = DeltaHistoryManager.getCommitsWithNonICTTimestamps(
deltaLog.store,
deltaLog.logPath,
monotonizationStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.storage.{ClosableIterator, SupportsRewinding}
import org.apache.spark.sql.delta.storage.ClosableIterator._
import org.apache.spark.sql.delta.util.{DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.util.ScalaExtensions._
import org.apache.hadoop.fs.FileStatus

import org.apache.spark.internal.MDC
Expand Down Expand Up @@ -792,7 +793,7 @@ case class DeltaSource(
val (result, duration) = Utils.timeTakenMs {
var iter = if (isInitialSnapshot) {
Iterator(1, 2).flatMapWithClose { // so that the filterAndIndexDeltaLogs call is lazy
case 1 => getSnapshotAt(fromVersion).toClosable
case 1 => getSnapshotAt(fromVersion)._1.toClosable
case 2 => filterAndIndexDeltaLogs(fromVersion + 1)
}
} else {
Expand Down Expand Up @@ -845,8 +846,10 @@ case class DeltaSource(
/**
* This method computes the initial snapshot to read when Delta Source was initialized on a fresh
* stream.
* @return A tuple where the first element is an iterator of IndexedFiles and the second element
* is the in-commit timestamp of the initial snapshot if available.
*/
protected def getSnapshotAt(version: Long): Iterator[IndexedFile] = {
protected def getSnapshotAt(version: Long): (Iterator[IndexedFile], Option[Long]) = {
if (initialState == null || version != initialStateVersion) {
super[DeltaSourceBase].cleanUpSnapshotResources()
val snapshot = getSnapshotFromDeltaLog(version)
Expand Down Expand Up @@ -879,7 +882,12 @@ case class DeltaSource(
)
}
}
addBeginAndEndIndexOffsetsForVersion(version, initialState.iterator())
val inCommitTimestampOpt =
Option.when(
DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(initialState.snapshot.metadata)) {
initialState.snapshot.timestamp
}
(addBeginAndEndIndexOffsetsForVersion(version, initialState.iterator()), inCommitTimestampOpt)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,9 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
val changes = getFileChangesForCDC(
startVersion, startIndex, isInitialSnapshot, limits = None, Some(endOffset))

val groupedFileActions =
changes.map { case (v, indexFiles) =>
(v, indexFiles.filter(_.hasFileAction).map(_.getFileAction).toSeq)
val groupedFileAndCommitInfoActions =
changes.map { case (v, indexFiles, commitInfoOpt) =>
(v, indexFiles.filter(_.hasFileAction).map(_.getFileAction).toSeq ++ commitInfoOpt)
}

val (result, duration) = Utils.timeTakenMs {
Expand All @@ -211,7 +211,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
readSnapshotDescriptor,
startVersion,
endOffset.reservoirVersion,
groupedFileActions,
groupedFileAndCommitInfoActions,
spark,
isStreaming = true)
.fileChangeDf
Expand All @@ -228,7 +228,8 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>

/**
* Get the changes starting from (fromVersion, fromIndex). fromVersion is included.
* It returns an iterator of (log_version, fileActions)
* It returns an iterator of (log_version, fileActions, Optional[CommitInfo]). The commit info
* is needed later on so that the InCommitTimestamp of the log files can be determined.
*
* If verifyMetadataAction = true, we will break the stream when we detect any read-incompatible
* metadata changes.
Expand All @@ -239,18 +240,20 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
isInitialSnapshot: Boolean,
limits: Option[AdmissionLimits],
endOffset: Option[DeltaSourceOffset],
verifyMetadataAction: Boolean = true): Iterator[(Long, Iterator[IndexedFile])] = {
verifyMetadataAction: Boolean = true
): Iterator[(Long, Iterator[IndexedFile], Option[CommitInfo])] = {

/** Returns matching files that were added on or after startVersion among delta logs. */
def filterAndIndexDeltaLogs(startVersion: Long): Iterator[(Long, IndexedChangeFileSeq)] = {
def filterAndIndexDeltaLogs(
startVersion: Long): Iterator[(Long, IndexedChangeFileSeq, Option[CommitInfo])] = {
// TODO: handle the case when failOnDataLoss = false and we are missing change log files
// in that case, we need to recompute the start snapshot and evolve the schema if needed
require(options.failOnDataLoss || !trackingMetadataChange,
"Using schema from schema tracking log cannot tolerate missing commit files.")
deltaLog.getChanges(startVersion, options.failOnDataLoss).map { case (version, actions) =>
// skipIndexedFile must be applied after creating IndexedFile so that
// IndexedFile.index is consistent across all versions.
val (fileActions, skipIndexedFile, metadataOpt, protocolOpt) =
val (fileActions, skipIndexedFile, metadataOpt, protocolOpt, commitInfoOpt) =
filterCDCActions(
actions, version, fromVersion, endOffset.map(_.reservoirVersion),
verifyMetadataAction && !trackingMetadataChange)
Expand Down Expand Up @@ -278,7 +281,7 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
remove = remove,
shouldSkip = skipIndexedFile)
})
(version, new IndexedChangeFileSeq(itr, isInitialSnapshot = false))
(version, new IndexedChangeFileSeq(itr, isInitialSnapshot = false), commitInfoOpt)
}
}

Expand All @@ -293,35 +296,43 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
}

val (result, duration) = Utils.timeTakenMs {
val iter: Iterator[(Long, IndexedChangeFileSeq)] = if (isInitialSnapshot) {
// If we are reading change data from the start of the table we need to
// get the latest snapshot of the table as well.
val snapshot: Iterator[IndexedFile] = getSnapshotAt(fromVersion).map { m =>
// When we get the snapshot the dataChange is false for the AddFile actions
// We need to set it to true for it to be considered by the CDCReader.
if (m.add != null) {
m.copy(add = m.add.copy(dataChange = true))
} else {
m
val iter: Iterator[(Long, IndexedChangeFileSeq, Option[CommitInfo])] =
if (isInitialSnapshot) {
// If we are reading change data from the start of the table we need to
// get the latest snapshot of the table as well.
val (unprocessedSnapshot, snapshotInCommitTimestampOpt) = getSnapshotAt(fromVersion)
val snapshot: Iterator[IndexedFile] = unprocessedSnapshot.map { m =>
// When we get the snapshot the dataChange is false for the AddFile actions
// We need to set it to true for it to be considered by the CDCReader.
if (m.add != null) {
m.copy(add = m.add.copy(dataChange = true))
} else {
m
}
}
// This is a hack so that we can easily access the ICT later on.
// This `CommitInfo` action is not useful for anything else and should be filtered
// out later on.
val ictOnlyCommitInfo = Some(CommitInfo.empty(Some(-1))
.copy(inCommitTimestamp = snapshotInCommitTimestampOpt))
val snapshotItr: Iterator[(Long, IndexedChangeFileSeq, Option[CommitInfo])] = Iterator((
fromVersion,
new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true),
ictOnlyCommitInfo
))

snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1)
} else {
filterAndIndexDeltaLogs(fromVersion)
}
val snapshotItr: Iterator[(Long, IndexedChangeFileSeq)] = Iterator((
fromVersion,
new IndexedChangeFileSeq(snapshot, isInitialSnapshot = true)
))

snapshotItr ++ filterAndIndexDeltaLogs(fromVersion + 1)
} else {
filterAndIndexDeltaLogs(fromVersion)
}

// In this case, filterFiles will consume the available capacity. We use takeWhile
// to stop the iteration when we reach the limit or if endOffset is specified and the
// endVersion is reached which will save us from reading unnecessary log files.
iter.takeWhile { case (version, _) =>
iter.takeWhile { case (version, _, _) =>
limits.forall(_.hasCapacity) && versionLessThanEndOffset(version, endOffset)
}.map { case (version, indexItr) =>
(version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset))
}.map { case (version, indexItr, ci) =>
(version, indexItr.filterFiles(fromVersion, fromIndex, limits, endOffset), ci)
}
}

Expand Down Expand Up @@ -351,10 +362,11 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
batchStartVersion: Long,
batchEndVersionOpt: Option[Long] = None,
verifyMetadataAction: Boolean = true
): (Seq[FileAction], Boolean, Option[Metadata], Option[Protocol]) = {
): (Seq[FileAction], Boolean, Option[Metadata], Option[Protocol], Option[CommitInfo]) = {
var shouldSkipIndexedFile = false
var metadataAction: Option[Metadata] = None
var protocolAction: Option[Protocol] = None
var commitInfoAction: Option[CommitInfo] = None
def checkAndCacheMetadata(m: Metadata): Unit = {
if (verifyMetadataAction) {
checkReadIncompatibleSchemaChanges(m, version, batchStartVersion, batchEndVersionOpt)
Expand All @@ -367,14 +379,21 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
if (actions.exists(_.isInstanceOf[AddCDCFile])) {
(actions.filter {
case _: AddCDCFile => true
case commitInfo: CommitInfo =>
commitInfoAction = Some(commitInfo)
false
case m: Metadata =>
checkAndCacheMetadata(m)
false
case p: Protocol =>
protocolAction = Some(p)
false
case _ => false
}.asInstanceOf[Seq[FileAction]], shouldSkipIndexedFile, metadataAction, protocolAction)
}.asInstanceOf[Seq[FileAction]],
shouldSkipIndexedFile,
metadataAction,
protocolAction,
commitInfoAction)
} else {
(actions.filter {
case a: AddFile =>
Expand All @@ -392,12 +411,17 @@ trait DeltaSourceCDCSupport { self: DeltaSource =>
false
case commitInfo: CommitInfo =>
shouldSkipIndexedFile = CDCReader.shouldSkipFileActionsInCommit(commitInfo)
commitInfoAction = Some(commitInfo)
false
case _: AddCDCFile | _: SetTransaction | _: DomainMetadata =>
false
case null => // Some crazy future feature. Ignore
false
}.asInstanceOf[Seq[FileAction]], shouldSkipIndexedFile, metadataAction, protocolAction)
}.asInstanceOf[Seq[FileAction]],
shouldSkipIndexedFile,
metadataAction,
protocolAction,
commitInfoAction)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,10 @@ trait CheckCDCAnswer extends QueryTest {
// Results should match the fully monotonized commits. Note that this map will include
// all versions of the table but only the ones in timestampsByVersion are checked for
// correctness.
val commits = DeltaHistoryManager.getCommits(
log.store,
log.logPath,
start = 0,
end = None,
log.newDeltaHadoopConf())

// Note that the timestamps come from filesystem modification timestamps, so they're
// milliseconds since epoch and we don't need to deal with timezones.
commits.map(f => (f.version -> new Timestamp(f.timestamp))).toMap
val commits = log.history.getHistory(start = 0, end = None)
// Note that the timestamps are in milliseconds since epoch and we don't need to deal
// with timezones.
commits.map(f => (f.getVersion -> f.timestamp)).toMap
}

timestampsByVersion.keySet.foreach { version =>
Expand Down
Loading

0 comments on commit a455845

Please sign in to comment.