Skip to content

Commit

Permalink
refactor listdeltas
Browse files Browse the repository at this point in the history
  • Loading branch information
dhruvarya-db committed May 24, 2024
1 parent 3cd9529 commit b2e295e
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 122 deletions.
286 changes: 164 additions & 122 deletions spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,18 @@ trait SnapshotManagement { self: DeltaLog =>
}
}

/**
* @return A tuple where the first element is an array of log files (possibly empty, if no
* usable log files are found), and the second element is the latest checksum file found
* which has a version less than or equal to `versionToLoad`.
*/
private def listFromFileSystemInternal(
startVersion: Long,
versionToLoad: Option[Long],
includeMinorCompactions: Boolean): Option[Array[(FileStatus, FileType.Value, Long)]] = {
listFromOrNone(startVersion).map {
includeMinorCompactions: Boolean
): (Option[Array[(FileStatus, FileType.Value, Long)]], Option[FileStatus]) = {
var latestAvailableChecksumFileStatus = Option.empty[FileStatus]
val files = listFromOrNone(startVersion).map {
_.flatMap {
case DeltaFile(f, fileVersion) =>
Some((f, FileType.DELTA, fileVersion))
Expand All @@ -120,7 +127,7 @@ trait SnapshotManagement { self: DeltaLog =>
case CheckpointFile(f, fileVersion) if f.getLen > 0 =>
Some((f, FileType.CHECKPOINT, fileVersion))
case ChecksumFile(f, version) if versionToLoad.forall(version <= _) =>
lastSeenChecksumFileStatusOpt = Some(f)
latestAvailableChecksumFileStatus = Some(f)
None
case _ =>
None
Expand All @@ -129,6 +136,7 @@ trait SnapshotManagement { self: DeltaLog =>
.takeWhile { case (_, _, fileVersion) => versionToLoad.forall(fileVersion <= _) }
.toArray
}
(files, latestAvailableChecksumFileStatus)
}

/**
Expand All @@ -137,145 +145,179 @@ trait SnapshotManagement { self: DeltaLog =>
* file-system and a commit-owner (if available), reconciles the results to account for
* asynchronous backfill operations, and ensures a comprehensive list of file statuses without
* missing any concurrently backfilled files.
* *Note*: If table is a managed-commit table, the commit-owner client MUST be passed to correctly
* list the commits.
* *Note*: If table is a managed-commit table, the commit store MUST be passed to correctly list
* the commits.
* The function also collects the latest checksum file found in the listings and returns it.
*
* @param startVersion the version to start. Inclusive.
* @param tableCommitOwnerClientOpt the optional commit-owner client to use for fetching
* un-backfilled commits.
* @param tableCommitOwnerClientOpt the optional commit store to use for fetching un-backfilled
* commits.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
* @param includeMinorCompactions Whether to include minor compaction files in the result
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
* @return A tuple where the first element is an array of log files (possibly empty, if no
* usable log files are found), and the second element is the latest checksum file found
* which has a version less than or equal to `versionToLoad`.
*/
protected final def listDeltaCompactedDeltaAndCheckpointFiles(
protected def listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile(
startVersion: Long,
tableCommitOwnerClientOpt: Option[TableCommitOwnerClient],
versionToLoad: Option[Long],
includeMinorCompactions: Boolean): Option[Array[FileStatus]] = {
recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
val tableCommitOwnerClient = tableCommitOwnerClientOpt.getOrElse {
return listFromFileSystemInternal(startVersion, versionToLoad, includeMinorCompactions)
.map(_.map(_._1))
}

// Submit a potential async call to get commits from commit-owner if available
val threadPool = SnapshotManagement.commitOwnerGetCommitsThreadPool
def getCommitsTask(async: Boolean): GetCommitsResponse = {
recordFrameProfile("DeltaLog", s"CommitOwnerClient.getCommits.async=$async") {
tableCommitOwnerClient.getCommits(Some(startVersion), endVersion = versionToLoad)
}
}
val unbackfilledCommitsResponseFuture =
if (threadPool.getActiveCount < threadPool.getMaximumPoolSize) {
threadPool.submit[GetCommitsResponse](spark) { getCommitsTask(async = true) }
} else {
// If the thread pool is full, we should not submit more tasks to it. Instead, we should
// run the task in the current thread.
logInfo("Getting un-backfilled commits from commit-owner in the same thread for table " +
s"$dataPath")
recordDeltaEvent(
this,
"delta.listDeltaAndCheckpointFiles.synchronousCommitOwnerGetCommits")
CompletableFuture.completedFuture(getCommitsTask(async = false))
}

var maxDeltaVersionSeen = startVersion - 1
val initialLogTuplesFromFsListingOpt =
includeMinorCompactions: Boolean): (Option[Array[FileStatus]], Option[FileStatus]) = {
val tableCommitOwnerClient = tableCommitOwnerClientOpt.getOrElse {
val (filesOpt, checksumOpt) =
listFromFileSystemInternal(startVersion, versionToLoad, includeMinorCompactions)
// Ideally listFromFileSystemInternal should return lexicographically sorted files and so
// maxDeltaVersionSeen should be equal to the last delta version. But we are being
// defensive here and taking max of all the delta fileVersions seen.
initialLogTuplesFromFsListingOpt.foreach { logTuples =>
logTuples.filter(_._2 == FileType.DELTA).map(_._3).foreach { deltaVersion =>
maxDeltaVersionSeen = Math.max(maxDeltaVersionSeen, deltaVersion)
}
return (filesOpt.map(_.map(_._1)), checksumOpt)
}

// Submit a potential async call to get commits from commit store if available
val threadPool = SnapshotManagement.commitOwnerGetCommitsThreadPool
def getCommitsTask(async: Boolean): GetCommitsResponse = {
recordFrameProfile("DeltaLog", s"CommitStore.getCommits.async=$async") {
tableCommitOwnerClient.getCommits(Some(startVersion), endVersion = versionToLoad)
}
val unbackfilledCommitsResponse = try {
unbackfilledCommitsResponseFuture.get()
} catch {
case e: java.util.concurrent.ExecutionException =>
throw new CommitOwnerGetCommitsFailedException(e.getCause)
}
val unbackfilledCommitsResponseFuture =
if (threadPool.getActiveCount < threadPool.getMaximumPoolSize) {
threadPool.submit[GetCommitsResponse](spark) { getCommitsTask(async = true) }
} else {
// If the thread pool is full, we should not submit more tasks to it. Instead, we should
// run the task in the current thread.
logInfo("Getting un-backfilled commits from commit store in the same thread for table " +
s"$dataPath")
recordDeltaEvent(
this,
"delta.listDeltaAndCheckpointFiles.synchronousCommitStoreGetCommits")
CompletableFuture.completedFuture(getCommitsTask(async = false))
}

def requiresAdditionalListing(): Boolean = {
// A gap in delta versions may occur if some delta files are backfilled "after" the
// file-system listing but before the commit-owner listing. To handle this scenario, we
// perform an additional listing from the file system because those missing files would be
// backfilled by now and show up in the file-system.
// Note: We only care about missing delta files with version <= versionToLoad
val areDeltaFilesMissing = unbackfilledCommitsResponse.getCommits.headOption match {
case Some(commit) =>
// Missing Delta files: [maxDeltaVersionSeen + 1, commit.head.version - 1]
maxDeltaVersionSeen + 1 < commit.getVersion
case None =>
// Missing Delta files: [maxDeltaVersionSeen + 1, latestTableVersion]
// When there are no commits, we should consider the latestTableVersion from the commit
// store to detect if ALL trailing commits were concurrently backfilled.
unbackfilledCommitsResponse.getLatestTableVersion >= 0 &&
maxDeltaVersionSeen < unbackfilledCommitsResponse.getLatestTableVersion
}
versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing
var maxDeltaVersionSeen = startVersion - 1
val (initialLogTuplesFromFsListingOpt, initialChecksumOpt) =
listFromFileSystemInternal(startVersion, versionToLoad, includeMinorCompactions)
// Ideally listFromFileSystemInternal should return lexicographically sorted files and so
// maxDeltaVersionSeen should be equal to the last delta version. But we are being
// defensive here and taking max of all the delta fileVersions seen.
initialLogTuplesFromFsListingOpt.foreach { logTuples =>
logTuples.filter(_._2 == FileType.DELTA).map(_._3).foreach { deltaVersion =>
maxDeltaVersionSeen = Math.max(maxDeltaVersionSeen, deltaVersion)
}
}
val unbackfilledCommitsResponse = try {
unbackfilledCommitsResponseFuture.get()
} catch {
case e: java.util.concurrent.ExecutionException =>
throw new CommitOwnerGetCommitsFailedException(e.getCause)
}

val initialMaxDeltaVersionSeen = maxDeltaVersionSeen
val additionalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
if (requiresAdditionalListing()) {
recordDeltaEvent(this, "delta.listDeltaAndCheckpointFiles.requiresAdditionalFsListing")
listFromFileSystemInternal(
startVersion = initialMaxDeltaVersionSeen + 1, versionToLoad, includeMinorCompactions)
} else {
None
}
additionalLogTuplesFromFsListingOpt.foreach { logTuples =>
logTuples.filter(_._2 == FileType.DELTA).map(_._3).foreach { deltaVersion =>
maxDeltaVersionSeen = Math.max(maxDeltaVersionSeen, deltaVersion)
}
def requiresAdditionalListing(): Boolean = {
// A gap in delta versions may occur if some delta files are backfilled "after" the
// file-system listing but before the commit-owner listing. To handle this scenario, we
// perform an additional listing from the file system because those missing files would be
// backfilled by now and show up in the file-system.
// Note: We only care about missing delta files with version <= versionToLoad
val areDeltaFilesMissing = unbackfilledCommitsResponse.getCommits.headOption match {
case Some(commit) =>
// Missing Delta files: [maxDeltaVersionSeen + 1, commit.head.version - 1]
maxDeltaVersionSeen + 1 < commit.getVersion
case None =>
// Missing Delta files: [maxDeltaVersionSeen + 1, latestTableVersion]
// When there are no commits, we should consider the latestTableVersion from the commit
// store to detect if ALL trailing commits were concurrently backfilled.
unbackfilledCommitsResponse.getLatestTableVersion >= 0 &&
maxDeltaVersionSeen < unbackfilledCommitsResponse.getLatestTableVersion
}
versionToLoad.forall(maxDeltaVersionSeen < _) && areDeltaFilesMissing
}

val initialMaxDeltaVersionSeen = maxDeltaVersionSeen
val (additionalLogTuplesFromFsListingOpt, additionalChecksumOpt) =
if (requiresAdditionalListing()) {
// We should not have any gaps in File-System versions and commit-owner versions after the
// additional listing.
val eventData = Map(
"initialLogsFromFsListingOpt" ->
initialLogTuplesFromFsListingOpt.map(_.map(_._1.getPath.toString)),
"additionalLogsFromFsListingOpt" ->
additionalLogTuplesFromFsListingOpt.map(_.map(_._1.getPath.toString)),
"maxDeltaVersionSeen" -> maxDeltaVersionSeen,
"unbackfilledCommits" ->
unbackfilledCommitsResponse.getCommits.map(
commit => commit.getFileStatus.getPath.toString),
"latestCommitVersion" -> unbackfilledCommitsResponse.getLatestTableVersion)
recordDeltaEvent(
deltaLog = this,
opType = "delta.listDeltaAndCheckpointFiles.unexpectedRequiresAdditionalFsListing",
data = eventData)
recordDeltaEvent(this, "delta.listDeltaAndCheckpointFiles.requiresAdditionalFsListing")
listFromFileSystemInternal(
startVersion = initialMaxDeltaVersionSeen + 1, versionToLoad, includeMinorCompactions)
} else {
(None, initialChecksumOpt)
}
additionalLogTuplesFromFsListingOpt.foreach { logTuples =>
logTuples.filter(_._2 == FileType.DELTA).map(_._3).foreach { deltaVersion =>
maxDeltaVersionSeen = Math.max(maxDeltaVersionSeen, deltaVersion)
}
}
if (requiresAdditionalListing()) {
// We should not have any gaps in File-System versions and CommitOwner versions after the
// additional listing.
val eventData = Map(
"initialLogsFromFsListingOpt" ->
initialLogTuplesFromFsListingOpt.map(_.map(_._1.getPath.toString)),
"additionalLogsFromFsListingOpt" ->
additionalLogTuplesFromFsListingOpt.map(_.map(_._1.getPath.toString)),
"maxDeltaVersionSeen" -> maxDeltaVersionSeen,
"unbackfilledCommits" ->
unbackfilledCommitsResponse.getCommits.map(
commit => commit.getFileStatus.getPath.toString),
"latestCommitVersion" -> unbackfilledCommitsResponse.getLatestTableVersion)
recordDeltaEvent(
deltaLog = this,
opType = "delta.listDeltaAndCheckpointFiles.unexpectedRequiresAdditionalFsListing",
data = eventData)
}

val finalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
(initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match {
case (Some(initial), Some(additional)) =>
// Filter initial list to exclude files with versions beyond
// `initialListingMaxDeltaVersionSeen` to prevent duplicating non-delta files with
// higher versions in the combined list. Ideally we shouldn't need this, but we are
// being defensive here if the log has missing files.
// E.g. initial = [0.json, 1.json, 2.checkpoint], initialListingMaxDeltaVersionSeen = 1,
// additional = [2.checkpoint], final = [0.json, 1.json, 2.checkpoint]
Some(initial.takeWhile(_._3 <= initialMaxDeltaVersionSeen) ++ additional)
case (Some(initial), None) => Some(initial)
case (None, Some(additional)) => Some(additional)
case _ => None
}
val finalLogTuplesFromFsListingOpt: Option[Array[(FileStatus, FileType.Value, Long)]] =
(initialLogTuplesFromFsListingOpt, additionalLogTuplesFromFsListingOpt) match {
case (Some(initial), Some(additional)) =>
// Filter initial list to exclude files with versions beyond
// `initialListingMaxDeltaVersionSeen` to prevent duplicating non-delta files with
// higher versions in the combined list. Ideally we shouldn't need this, but we are
// being defensive here if the log has missing files.
// E.g. initial = [0.json, 1.json, 2.checkpoint], initialListingMaxDeltaVersionSeen = 1,
// additional = [2.checkpoint], final = [0.json, 1.json, 2.checkpoint]
Some(initial.takeWhile(_._3 <= initialMaxDeltaVersionSeen) ++ additional)
case (Some(initial), None) => Some(initial)
case (None, Some(additional)) => Some(additional)
case _ => None
}

val unbackfilledCommitsFiltered = unbackfilledCommitsResponse.getCommits
.dropWhile(_.getVersion <= maxDeltaVersionSeen)
.takeWhile(commit => versionToLoad.forall(commit.getVersion <= _))
.map(_.getFileStatus)
val unbackfilledCommitsFiltered = unbackfilledCommitsResponse.getCommits
.dropWhile(_.getVersion <= maxDeltaVersionSeen)
.takeWhile(commit => versionToLoad.forall(commit.getVersion <= _))
.map(_.getFileStatus)

// If result from fs listing is None and result from commit-owner is empty, return none.
// This is used by caller to distinguish whether table doesn't exist.
finalLogTuplesFromFsListingOpt.map { logTuplesFromFsListing =>
logTuplesFromFsListing.map(_._1) ++ unbackfilledCommitsFiltered
}
// If result from fs listing is None and result from commit-store is empty, return none.
// This is used by caller to distinguish whether table doesn't exist.
val logTuplesToReturn = finalLogTuplesFromFsListingOpt.map { logTuplesFromFsListing =>
logTuplesFromFsListing.map(_._1) ++ unbackfilledCommitsFiltered
}
val latestChecksumOpt = additionalChecksumOpt.orElse(initialChecksumOpt)
(logTuplesToReturn, latestChecksumOpt)
}

/**
* This method is designed to efficiently and reliably list delta, compacted delta, and
* checkpoint files associated with a Delta Lake table. It makes parallel calls to both the
* file-system and a commit-owner (if available), reconciles the results to account for
* asynchronous backfill operations, and ensures a comprehensive list of file statuses without
* missing any concurrently backfilled files.
* *Note*: If table is a managed-commit table, the commit-owner client MUST be passed to correctly
* list the commits.
*
* @param startVersion the version to start. Inclusive.
* @param tableCommitOwnerClientOpt the optional commit-owner client to use for fetching
* un-backfilled commits.
* @param versionToLoad the optional parameter to set the max version we should return. Inclusive.
* @param includeMinorCompactions Whether to include minor compaction files in the result
* @return Some array of files found (possibly empty, if no usable commit files are present), or
* None if the listing returned no files at all.
*/
protected final def listDeltaCompactedDeltaAndCheckpointFiles(
startVersion: Long,
tableCommitOwnerClientOpt: Option[TableCommitOwnerClient],
versionToLoad: Option[Long],
includeMinorCompactions: Boolean): Option[Array[FileStatus]] = {
recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") {
val (logTuplesOpt, latestChecksumOpt) =
listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile(
startVersion, tableCommitOwnerClientOpt, versionToLoad, includeMinorCompactions)
lastSeenChecksumFileStatusOpt = latestChecksumOpt
logTuplesOpt
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,26 @@ trait ManagedCommitTestUtils
}
}

/**
* Run the test against a [[TrackingCommitOwnerClient]] with backfill batch size =
* `batchBackfillSize`
*/
def testWithTrackingInMemoryCommitOwner(
backfillBatchSize: Int)(testName: String)(f: => Unit): Unit = {
test(s"$testName [Backfill batch size: $backfillBatchSize]") {
CommitOwnerProvider.clearNonDefaultBuilders()
CommitOwnerProvider.registerBuilder(TrackingInMemoryCommitOwnerBuilder(backfillBatchSize))
val managedCommitOwnerConf = Map("randomConf" -> "randomConfValue")
val managedCommitOwnerJson = JsonUtils.toJson(managedCommitOwnerConf)
withSQLConf(
DeltaConfigs.MANAGED_COMMIT_OWNER_NAME.defaultTablePropertyKey -> "tracking-in-memory",
DeltaConfigs.MANAGED_COMMIT_OWNER_CONF.defaultTablePropertyKey ->
managedCommitOwnerJson) {
f
}
}
}

/** Run the test with:
* 1. Without managed-commits
* 2. With managed-commits with different backfill batch sizes
Expand Down

0 comments on commit b2e295e

Please sign in to comment.