Skip to content

Commit

Permalink
[Spark][Version Checksum] Incrementally compute the checksum (#3828)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
#3799 added the capability to
write a Checksum file after every commit. However, writing a checksum
currently requires a full state reconstruction --- which is expensive.
This PR adds the capability to compute most of the fields incrementally
(apply the current delta on top of the last checksum to get the checksum
of the current version). This works as long as the the actual operation
performed matches exactly with the specified operation type in the
commit. Note that this feature is gated behind a flag that is `true` by
default.

## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->

Added tests in ChecksumSuite.

## Does this PR introduce _any_ user-facing changes?

<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
No
  • Loading branch information
dhruvarya-db authored Oct 31, 2024
1 parent 1eff5df commit 9f45281
Show file tree
Hide file tree
Showing 7 changed files with 323 additions and 18 deletions.
187 changes: 186 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ trait RecordChecksum extends DeltaLogging {
}

val version = snapshot.version
val checksum = snapshot.computeChecksum.copy(txnId = Some(txnId))
val checksumWithoutTxnId = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum)
val checksum = checksumWithoutTxnId.copy(txnId = Some(txnId))
val eventData = mutable.Map[String, Any]("operationSucceeded" -> false)
eventData("numAddFileActions") = checksum.allFiles.map(_.size).getOrElse(-1)
eventData("numSetTransactionActions") = checksum.setTransactions.map(_.size).getOrElse(-1)
Expand Down Expand Up @@ -115,6 +116,190 @@ trait RecordChecksum extends DeltaLogging {
opType = "delta.checksum.write",
data = eventData)
}

/**
* Incrementally derive checksum for the just-committed or about-to-be committed snapshot.
* @param spark The SparkSession
* @param deltaLog The DeltaLog
* @param versionToCompute The version for which we want to compute the checksum
* @param actions The actions corresponding to the version `versionToCompute`
* @param metadata The metadata corresponding to the version `versionToCompute`
* @param protocol The protocol corresponding to the version `versionToCompute`
* @param operationName The operation name corresponding to the version `versionToCompute`
* @param txnIdOpt The transaction identifier for the version `versionToCompute`
* @param previousVersionState Contains either the versionChecksum corresponding to
* `versionToCompute - 1` or a snapshot. Note that the snapshot may
* belong to any version and this method will only use the snapshot if
* it corresponds to `versionToCompute - 1`.
* @return Either the new checksum or an error code string if the checksum could not be computed.
*/
// scalastyle:off argcount
def incrementallyDeriveChecksum(
spark: SparkSession,
deltaLog: DeltaLog,
versionToCompute: Long,
actions: Seq[Action],
metadata: Metadata,
protocol: Protocol,
operationName: String,
txnIdOpt: Option[String],
previousVersionState: Either[Snapshot, VersionChecksum]
): Either[String, VersionChecksum] = {
// scalastyle:on argcount
if (!deltaLog.incrementalCommitEnabled) {
return Left("INCREMENTAL_COMMITS_DISABLED")
}

// Do not incrementally derive checksum for ManualUpdate operations since it may
// include actions that violate delta protocol invariants.
if (operationName == DeltaOperations.ManualUpdate.name) {
return Left("INVALID_OPERATION_MANUAL_UPDATE")
}

// Try to incrementally compute a VersionChecksum for the just-committed snapshot.
val expectedVersion = versionToCompute - 1
val (oldVersionChecksum, oldSnapshot) = previousVersionState match {
case Right(checksum) => checksum -> None
case Left(snapshot) if snapshot.version == expectedVersion =>
// The original snapshot is still fresh so use it directly. Note this could trigger
// a state reconstruction if there is not an existing checksumOpt in the snapshot
// or if the existing checksumOpt contains missing information e.g.
// a null valued metadata or protocol. However, if we do not obtain a checksum here,
// then we cannot incrementally derive a new checksum for the new snapshot.
logInfo(log"Incremental commit: starting with snapshot version " +
log"${MDC(DeltaLogKeys.VERSION, expectedVersion)}")
val snapshotChecksum = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum)
snapshotChecksum.copy(numMetadata = 1, numProtocol = 1) -> Some(snapshot)
case _ =>
previousVersionState.swap.foreach { snapshot =>
// Occurs when snapshot is no longer fresh due to concurrent writers.
// Read CRC file and validate checksum information is complete.
recordDeltaEvent(deltaLog, opType = "delta.commit.snapshotAgedOut", data = Map(
"snapshotVersion" -> snapshot.version,
"commitAttemptVersion" -> versionToCompute
))
}
val oldCrcOpt = deltaLog.readChecksum(expectedVersion)
if (oldCrcOpt.isEmpty) {
return Left("MISSING_OLD_CRC")
}
val oldCrcFiltered = oldCrcOpt
.filterNot(_.metadata == null)
.filterNot(_.protocol == null)

val oldCrc = oldCrcFiltered.getOrElse {
return Left("OLD_CRC_INCOMPLETE")
}
oldCrc -> None
}

// Incrementally compute the new version checksum, if the old one is available.
val ignoreAddFilesInOperation =
RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName)

computeNewChecksum(
versionToCompute,
operationName,
txnIdOpt,
oldVersionChecksum,
oldSnapshot,
actions,
ignoreAddFilesInOperation
)
}

/**
* Incrementally derive new checksum from old checksum + actions.
*
* @param attemptVersion commit attempt version for which we want to generate CRC.
* @param operationName operation name for the attempted commit.
* @param txnId transaction identifier.
* @param oldVersionChecksum from previous commit (attemptVersion - 1).
* @param oldSnapshot snapshot representing previous commit version (i.e. attemptVersion - 1),
* None if not available.
* @param actions used to incrementally compute new checksum.
* @param ignoreAddFiles for transactions whose add file actions refer to already-existing files
* e.g., [[DeltaOperations.ComputeStats]] transactions.
* @return Either the new checksum or error code string if the checksum could not be computed
* incrementally due to some reason.
*/
// scalastyle:off argcount
private[delta] def computeNewChecksum(
attemptVersion: Long,
operationName: String,
txnIdOpt: Option[String],
oldVersionChecksum: VersionChecksum,
oldSnapshot: Option[Snapshot],
actions: Seq[Action],
ignoreAddFiles: Boolean
) : Either[String, VersionChecksum] = {
// scalastyle:on argcount
oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1)))
var tableSizeBytes = oldVersionChecksum.tableSizeBytes
var numFiles = oldVersionChecksum.numFiles
var protocol = oldVersionChecksum.protocol
var metadata = oldVersionChecksum.metadata

var inCommitTimestamp : Option[Long] = None
actions.foreach {
case a: AddFile if !ignoreAddFiles =>
tableSizeBytes += a.size
numFiles += 1


// extendedFileMetadata == true implies fields partitionValues, size, and tags are present
case r: RemoveFile if r.extendedFileMetadata == Some(true) =>
val size = r.size.get
tableSizeBytes -= size
numFiles -= 1


case r: RemoveFile =>
// Report the failure to usage logs.
val msg = s"A remove action with a missing file size was detected in file ${r.path} " +
"causing incremental commit to fallback to state reconstruction."
recordDeltaEvent(
this.deltaLog,
"delta.checksum.compute",
data = Map("error" -> msg))
return Left("ENCOUNTERED_REMOVE_FILE_MISSING_SIZE")
case p: Protocol =>
protocol = p
case m: Metadata =>
metadata = m
case ci: CommitInfo =>
inCommitTimestamp = ci.inCommitTimestamp
case _ =>
}

Right(VersionChecksum(
txnId = txnIdOpt,
tableSizeBytes = tableSizeBytes,
numFiles = numFiles,
numMetadata = 1,
numProtocol = 1,
inCommitTimestampOpt = inCommitTimestamp,
metadata = metadata,
protocol = protocol,
setTransactions = None,
domainMetadata = None,
histogramOpt = None,
allFiles = None
))
}

}

object RecordChecksum {
// Operations where we should ignore AddFiles in the incremental checksum computation.
val operationNamesWhereAddFilesIgnoredForIncrementalCrc = Set(
// The transaction that computes stats is special -- it re-adds files that already exist, in
// order to update their min/max stats. We should not count those against the totals.
DeltaOperations.ComputeStats(Seq.empty).name,
// Backfill/Tagging re-adds existing AddFiles without changing the underlying data files.
// Incremental commits should ignore backfill commits.
DeltaOperations.RowTrackingBackfill().name
)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ class DeltaLog private(
def maxSnapshotLineageLength: Int =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_SNAPSHOT_LINEAGE_LENGTH)

private[delta] def incrementalCommitEnabled: Boolean = {
DeltaLog.incrementalCommitEnableConfigs.forall(conf => spark.conf.get(conf))
}

/** The unique identifier for this table. */
def tableId: String = unsafeVolatileMetadata.id // safe because table id never changes

Expand Down Expand Up @@ -720,6 +724,10 @@ object DeltaLog extends DeltaLogging {
.maximumSize(cacheSize)
}

val incrementalCommitEnableConfigs = Seq(
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED
)


/**
* Creates a [[LogicalRelation]] for a given [[DeltaLogFileIndex]], with all necessary file source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,8 @@ object OptimisticTransaction {
trait OptimisticTransactionImpl extends TransactionalWrite
with SQLMetricsReporting
with DeltaScanGenerator
with DeltaLogging {
with DeltaLogging
with RecordChecksum {

import org.apache.spark.sql.delta.util.FileNames._

Expand Down Expand Up @@ -2475,7 +2476,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
val commitFile = writeCommitFileImpl(
attemptVersion, jsonActions, commitCoordinatorClient, currentTransactionInfo)
(None, commitFile)
val newChecksumOpt = incrementallyDeriveChecksum(attemptVersion, currentTransactionInfo)
(newChecksumOpt, commitFile)
}

protected def writeCommitFileImpl(
Expand All @@ -2501,6 +2503,33 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commitResponse.getCommit
}


/**
* Given an attemptVersion, obtain checksum for previous snapshot version
* (i.e., attemptVersion - 1) and incrementally derives a new checksum from
* the actions of the current transaction.
*
* @param attemptVersion that the current transaction is committing
* @param currentTransactionInfo containing actions of the current transaction
* @return
*/
protected def incrementallyDeriveChecksum(
attemptVersion: Long,
currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum] = {

incrementallyDeriveChecksum(
spark,
deltaLog,
attemptVersion,
actions = currentTransactionInfo.finalActionsToCommit,
metadata = currentTransactionInfo.metadata,
protocol = currentTransactionInfo.protocol,
operationName = currentTransactionInfo.op.name,
txnIdOpt = Some(currentTransactionInfo.txnId),
previousVersionState = scala.Left(snapshot)
).toOption
}

/**
* Looks at actions that have happened since the txn started and checks for logical
* conflicts with the read/writes. Resolve conflicts and returns a tuple representing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,10 @@ class Snapshot(
numProtocol = numOfProtocol,
inCommitTimestampOpt = getInCommitTimestampOpt,
setTransactions = checksumOpt.flatMap(_.setTransactions),
domainMetadata = domainMetadatasIfKnown,
domainMetadata = checksumOpt.flatMap(_.domainMetadata),
metadata = metadata,
protocol = protocol,
histogramOpt = fileSizeHistogram,
histogramOpt = checksumOpt.flatMap(_.histogramOpt),
allFiles = checksumOpt.flatMap(_.allFiles))

/** Returns the data schema of the table, used for reading stats */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,14 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val INCREMENTAL_COMMIT_ENABLED =
buildConf("incremental.commit.enabled")
.internal()
.doc("If true, Delta will incrementally compute the content of the commit checksum " +
"file, which avoids the full state reconstruction that would otherwise be required.")
.booleanConf
.createWithDefault(true)

val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED =
buildConf("checkpoint.exceptionThrowing.enabled")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.test.SharedSparkSession

class ChecksumSuite
Expand Down Expand Up @@ -60,4 +62,72 @@ class ChecksumSuite
testChecksumFile(writeChecksumEnabled = true)
testChecksumFile(writeChecksumEnabled = false)
}

test("Incremental checksums: post commit snapshot should have a checksum " +
"without triggering state reconstruction") {
for (incrementalCommitEnabled <- BOOLEAN_DOMAIN) {
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false",
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString) {
withTempDir { tempDir =>
val df = spark.range(1)
df.write.format("delta").mode("append").save(tempDir.getCanonicalPath)
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
log
.startTransaction()
.commit(Seq(createTestAddFile()), DeltaOperations.Write(SaveMode.Append))
val postCommitSnapshot = log.snapshot
assert(postCommitSnapshot.version == 1)
assert(!postCommitSnapshot.stateReconstructionTriggered)
assert(postCommitSnapshot.checksumOpt.isDefined == incrementalCommitEnabled)

postCommitSnapshot.checksumOpt.foreach { incrementalChecksum =>
val checksumFromStateReconstruction = postCommitSnapshot.computeChecksum
assert(incrementalChecksum.copy(txnId = None) == checksumFromStateReconstruction)
}
}
}
}
}

def testIncrementalChecksumWrites(tableMutationOperation: String => Unit): Unit = {
withTempDir { tempDir =>
withSQLConf(
DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true",
DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key ->"true") {
val df = spark.range(10).withColumn("id2", col("id") % 2)
df.write
.format("delta")
.partitionBy("id")
.mode("append")
.save(tempDir.getCanonicalPath)

tableMutationOperation(tempDir.getCanonicalPath)
val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath))
val checksumOpt = log.snapshot.checksumOpt
assert(checksumOpt.isDefined)
val checksum = checksumOpt.get
val computedChecksum = log.snapshot.computeChecksum
assert(checksum.copy(txnId = None) === computedChecksum)
}
}
}

test("Incremental checksums: INSERT") {
testIncrementalChecksumWrites { tablePath =>
sql(s"INSERT INTO delta.`$tablePath` SELECT *, 1 FROM range(10, 20)")
}
}

test("Incremental checksums: UPDATE") {
testIncrementalChecksumWrites { tablePath =>
sql(s"UPDATE delta.`$tablePath` SET id2 = id + 1 WHERE id % 2 = 0")
}
}

test("Incremental checksums: DELETE") {
testIncrementalChecksumWrites { tablePath =>
sql(s"DELETE FROM delta.`$tablePath` WHERE id % 2 = 0")
}
}
}
Loading

0 comments on commit 9f45281

Please sign in to comment.