diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala index 67f46a32e9..9dfd3718c3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checksum.scala @@ -82,7 +82,8 @@ trait RecordChecksum extends DeltaLogging { } val version = snapshot.version - val checksum = snapshot.computeChecksum.copy(txnId = Some(txnId)) + val checksumWithoutTxnId = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum) + val checksum = checksumWithoutTxnId.copy(txnId = Some(txnId)) val eventData = mutable.Map[String, Any]("operationSucceeded" -> false) eventData("numAddFileActions") = checksum.allFiles.map(_.size).getOrElse(-1) eventData("numSetTransactionActions") = checksum.setTransactions.map(_.size).getOrElse(-1) @@ -115,6 +116,190 @@ trait RecordChecksum extends DeltaLogging { opType = "delta.checksum.write", data = eventData) } + + /** + * Incrementally derive checksum for the just-committed or about-to-be committed snapshot. + * @param spark The SparkSession + * @param deltaLog The DeltaLog + * @param versionToCompute The version for which we want to compute the checksum + * @param actions The actions corresponding to the version `versionToCompute` + * @param metadata The metadata corresponding to the version `versionToCompute` + * @param protocol The protocol corresponding to the version `versionToCompute` + * @param operationName The operation name corresponding to the version `versionToCompute` + * @param txnIdOpt The transaction identifier for the version `versionToCompute` + * @param previousVersionState Contains either the versionChecksum corresponding to + * `versionToCompute - 1` or a snapshot. Note that the snapshot may + * belong to any version and this method will only use the snapshot if + * it corresponds to `versionToCompute - 1`. + * @return Either the new checksum or an error code string if the checksum could not be computed. + */ + // scalastyle:off argcount + def incrementallyDeriveChecksum( + spark: SparkSession, + deltaLog: DeltaLog, + versionToCompute: Long, + actions: Seq[Action], + metadata: Metadata, + protocol: Protocol, + operationName: String, + txnIdOpt: Option[String], + previousVersionState: Either[Snapshot, VersionChecksum] + ): Either[String, VersionChecksum] = { + // scalastyle:on argcount + if (!deltaLog.incrementalCommitEnabled) { + return Left("INCREMENTAL_COMMITS_DISABLED") + } + + // Do not incrementally derive checksum for ManualUpdate operations since it may + // include actions that violate delta protocol invariants. + if (operationName == DeltaOperations.ManualUpdate.name) { + return Left("INVALID_OPERATION_MANUAL_UPDATE") + } + + // Try to incrementally compute a VersionChecksum for the just-committed snapshot. + val expectedVersion = versionToCompute - 1 + val (oldVersionChecksum, oldSnapshot) = previousVersionState match { + case Right(checksum) => checksum -> None + case Left(snapshot) if snapshot.version == expectedVersion => + // The original snapshot is still fresh so use it directly. Note this could trigger + // a state reconstruction if there is not an existing checksumOpt in the snapshot + // or if the existing checksumOpt contains missing information e.g. + // a null valued metadata or protocol. However, if we do not obtain a checksum here, + // then we cannot incrementally derive a new checksum for the new snapshot. + logInfo(log"Incremental commit: starting with snapshot version " + + log"${MDC(DeltaLogKeys.VERSION, expectedVersion)}") + val snapshotChecksum = snapshot.checksumOpt.getOrElse(snapshot.computeChecksum) + snapshotChecksum.copy(numMetadata = 1, numProtocol = 1) -> Some(snapshot) + case _ => + previousVersionState.swap.foreach { snapshot => + // Occurs when snapshot is no longer fresh due to concurrent writers. + // Read CRC file and validate checksum information is complete. + recordDeltaEvent(deltaLog, opType = "delta.commit.snapshotAgedOut", data = Map( + "snapshotVersion" -> snapshot.version, + "commitAttemptVersion" -> versionToCompute + )) + } + val oldCrcOpt = deltaLog.readChecksum(expectedVersion) + if (oldCrcOpt.isEmpty) { + return Left("MISSING_OLD_CRC") + } + val oldCrcFiltered = oldCrcOpt + .filterNot(_.metadata == null) + .filterNot(_.protocol == null) + + val oldCrc = oldCrcFiltered.getOrElse { + return Left("OLD_CRC_INCOMPLETE") + } + oldCrc -> None + } + + // Incrementally compute the new version checksum, if the old one is available. + val ignoreAddFilesInOperation = + RecordChecksum.operationNamesWhereAddFilesIgnoredForIncrementalCrc.contains(operationName) + + computeNewChecksum( + versionToCompute, + operationName, + txnIdOpt, + oldVersionChecksum, + oldSnapshot, + actions, + ignoreAddFilesInOperation + ) + } + + /** + * Incrementally derive new checksum from old checksum + actions. + * + * @param attemptVersion commit attempt version for which we want to generate CRC. + * @param operationName operation name for the attempted commit. + * @param txnId transaction identifier. + * @param oldVersionChecksum from previous commit (attemptVersion - 1). + * @param oldSnapshot snapshot representing previous commit version (i.e. attemptVersion - 1), + * None if not available. + * @param actions used to incrementally compute new checksum. + * @param ignoreAddFiles for transactions whose add file actions refer to already-existing files + * e.g., [[DeltaOperations.ComputeStats]] transactions. + * @return Either the new checksum or error code string if the checksum could not be computed + * incrementally due to some reason. + */ + // scalastyle:off argcount + private[delta] def computeNewChecksum( + attemptVersion: Long, + operationName: String, + txnIdOpt: Option[String], + oldVersionChecksum: VersionChecksum, + oldSnapshot: Option[Snapshot], + actions: Seq[Action], + ignoreAddFiles: Boolean + ) : Either[String, VersionChecksum] = { + // scalastyle:on argcount + oldSnapshot.foreach(s => require(s.version == (attemptVersion - 1))) + var tableSizeBytes = oldVersionChecksum.tableSizeBytes + var numFiles = oldVersionChecksum.numFiles + var protocol = oldVersionChecksum.protocol + var metadata = oldVersionChecksum.metadata + + var inCommitTimestamp : Option[Long] = None + actions.foreach { + case a: AddFile if !ignoreAddFiles => + tableSizeBytes += a.size + numFiles += 1 + + + // extendedFileMetadata == true implies fields partitionValues, size, and tags are present + case r: RemoveFile if r.extendedFileMetadata == Some(true) => + val size = r.size.get + tableSizeBytes -= size + numFiles -= 1 + + + case r: RemoveFile => + // Report the failure to usage logs. + val msg = s"A remove action with a missing file size was detected in file ${r.path} " + + "causing incremental commit to fallback to state reconstruction." + recordDeltaEvent( + this.deltaLog, + "delta.checksum.compute", + data = Map("error" -> msg)) + return Left("ENCOUNTERED_REMOVE_FILE_MISSING_SIZE") + case p: Protocol => + protocol = p + case m: Metadata => + metadata = m + case ci: CommitInfo => + inCommitTimestamp = ci.inCommitTimestamp + case _ => + } + + Right(VersionChecksum( + txnId = txnIdOpt, + tableSizeBytes = tableSizeBytes, + numFiles = numFiles, + numMetadata = 1, + numProtocol = 1, + inCommitTimestampOpt = inCommitTimestamp, + metadata = metadata, + protocol = protocol, + setTransactions = None, + domainMetadata = None, + histogramOpt = None, + allFiles = None + )) + } + +} + +object RecordChecksum { + // Operations where we should ignore AddFiles in the incremental checksum computation. + val operationNamesWhereAddFilesIgnoredForIncrementalCrc = Set( + // The transaction that computes stats is special -- it re-adds files that already exist, in + // order to update their min/max stats. We should not count those against the totals. + DeltaOperations.ComputeStats(Seq.empty).name, + // Backfill/Tagging re-adds existing AddFiles without changing the underlying data files. + // Incremental commits should ignore backfill commits. + DeltaOperations.RowTrackingBackfill().name + ) } /** diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 0adb60cd09..f07d5e1baa 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -142,6 +142,10 @@ class DeltaLog private( def maxSnapshotLineageLength: Int = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_MAX_SNAPSHOT_LINEAGE_LENGTH) + private[delta] def incrementalCommitEnabled: Boolean = { + DeltaLog.incrementalCommitEnableConfigs.forall(conf => spark.conf.get(conf)) + } + /** The unique identifier for this table. */ def tableId: String = unsafeVolatileMetadata.id // safe because table id never changes @@ -720,6 +724,10 @@ object DeltaLog extends DeltaLogging { .maximumSize(cacheSize) } + val incrementalCommitEnableConfigs = Seq( + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED + ) + /** * Creates a [[LogicalRelation]] for a given [[DeltaLogFileIndex]], with all necessary file source diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 340848ec40..09aaac23c2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -250,7 +250,8 @@ object OptimisticTransaction { trait OptimisticTransactionImpl extends TransactionalWrite with SQLMetricsReporting with DeltaScanGenerator - with DeltaLogging { + with DeltaLogging + with RecordChecksum { import org.apache.spark.sql.delta.util.FileNames._ @@ -2475,7 +2476,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite } val commitFile = writeCommitFileImpl( attemptVersion, jsonActions, commitCoordinatorClient, currentTransactionInfo) - (None, commitFile) + val newChecksumOpt = incrementallyDeriveChecksum(attemptVersion, currentTransactionInfo) + (newChecksumOpt, commitFile) } protected def writeCommitFileImpl( @@ -2501,6 +2503,33 @@ trait OptimisticTransactionImpl extends TransactionalWrite commitResponse.getCommit } + + /** + * Given an attemptVersion, obtain checksum for previous snapshot version + * (i.e., attemptVersion - 1) and incrementally derives a new checksum from + * the actions of the current transaction. + * + * @param attemptVersion that the current transaction is committing + * @param currentTransactionInfo containing actions of the current transaction + * @return + */ + protected def incrementallyDeriveChecksum( + attemptVersion: Long, + currentTransactionInfo: CurrentTransactionInfo): Option[VersionChecksum] = { + + incrementallyDeriveChecksum( + spark, + deltaLog, + attemptVersion, + actions = currentTransactionInfo.finalActionsToCommit, + metadata = currentTransactionInfo.metadata, + protocol = currentTransactionInfo.protocol, + operationName = currentTransactionInfo.op.name, + txnIdOpt = Some(currentTransactionInfo.txnId), + previousVersionState = scala.Left(snapshot) + ).toOption + } + /** * Looks at actions that have happened since the txn started and checks for logical * conflicts with the read/writes. Resolve conflicts and returns a tuple representing diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index 0ccf07e3c8..158452110a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -518,10 +518,10 @@ class Snapshot( numProtocol = numOfProtocol, inCommitTimestampOpt = getInCommitTimestampOpt, setTransactions = checksumOpt.flatMap(_.setTransactions), - domainMetadata = domainMetadatasIfKnown, + domainMetadata = checksumOpt.flatMap(_.domainMetadata), metadata = metadata, protocol = protocol, - histogramOpt = fileSizeHistogram, + histogramOpt = checksumOpt.flatMap(_.histogramOpt), allFiles = checksumOpt.flatMap(_.allFiles)) /** Returns the data schema of the table, used for reading stats */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 649d34190c..ace0fbfb43 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1027,6 +1027,14 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) + val INCREMENTAL_COMMIT_ENABLED = + buildConf("incremental.commit.enabled") + .internal() + .doc("If true, Delta will incrementally compute the content of the commit checksum " + + "file, which avoids the full state reconstruction that would otherwise be required.") + .booleanConf + .createWithDefault(true) + val DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED = buildConf("checkpoint.exceptionThrowing.enabled") .internal() diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala index 148913121c..174af7c51a 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/ChecksumSuite.scala @@ -26,6 +26,8 @@ import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs.Path import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.functions.col import org.apache.spark.sql.test.SharedSparkSession class ChecksumSuite @@ -60,4 +62,72 @@ class ChecksumSuite testChecksumFile(writeChecksumEnabled = true) testChecksumFile(writeChecksumEnabled = false) } + + test("Incremental checksums: post commit snapshot should have a checksum " + + "without triggering state reconstruction") { + for (incrementalCommitEnabled <- BOOLEAN_DOMAIN) { + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false", + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> incrementalCommitEnabled.toString) { + withTempDir { tempDir => + val df = spark.range(1) + df.write.format("delta").mode("append").save(tempDir.getCanonicalPath) + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + log + .startTransaction() + .commit(Seq(createTestAddFile()), DeltaOperations.Write(SaveMode.Append)) + val postCommitSnapshot = log.snapshot + assert(postCommitSnapshot.version == 1) + assert(!postCommitSnapshot.stateReconstructionTriggered) + assert(postCommitSnapshot.checksumOpt.isDefined == incrementalCommitEnabled) + + postCommitSnapshot.checksumOpt.foreach { incrementalChecksum => + val checksumFromStateReconstruction = postCommitSnapshot.computeChecksum + assert(incrementalChecksum.copy(txnId = None) == checksumFromStateReconstruction) + } + } + } + } + } + + def testIncrementalChecksumWrites(tableMutationOperation: String => Unit): Unit = { + withTempDir { tempDir => + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "true", + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key ->"true") { + val df = spark.range(10).withColumn("id2", col("id") % 2) + df.write + .format("delta") + .partitionBy("id") + .mode("append") + .save(tempDir.getCanonicalPath) + + tableMutationOperation(tempDir.getCanonicalPath) + val log = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath)) + val checksumOpt = log.snapshot.checksumOpt + assert(checksumOpt.isDefined) + val checksum = checksumOpt.get + val computedChecksum = log.snapshot.computeChecksum + assert(checksum.copy(txnId = None) === computedChecksum) + } + } + } + + test("Incremental checksums: INSERT") { + testIncrementalChecksumWrites { tablePath => + sql(s"INSERT INTO delta.`$tablePath` SELECT *, 1 FROM range(10, 20)") + } + } + + test("Incremental checksums: UPDATE") { + testIncrementalChecksumWrites { tablePath => + sql(s"UPDATE delta.`$tablePath` SET id2 = id + 1 WHERE id % 2 = 0") + } + } + + test("Incremental checksums: DELETE") { + testIncrementalChecksumWrites { tablePath => + sql(s"DELETE FROM delta.`$tablePath` WHERE id % 2 = 0") + } + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala index 4daff9811e..48a8f20490 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala @@ -933,19 +933,24 @@ class OptimisticTransactionSuite } test("Append does not trigger snapshot state computation") { - withTempDir { tableDir => - val df = Seq((1, 0), (2, 1)).toDF("key", "value") - df.write.format("delta").mode("append").save(tableDir.getCanonicalPath) - - val deltaLog = DeltaLog.forTable(spark, tableDir) - val preCommitSnapshot = deltaLog.update() - assert(!preCommitSnapshot.stateReconstructionTriggered) - - df.write.format("delta").mode("append").save(tableDir.getCanonicalPath) - - val postCommitSnapshot = deltaLog.update() - assert(!preCommitSnapshot.stateReconstructionTriggered) - assert(!postCommitSnapshot.stateReconstructionTriggered) + withSQLConf( + DeltaSQLConf.DELTA_WRITE_CHECKSUM_ENABLED.key -> "false", + DeltaSQLConf.INCREMENTAL_COMMIT_ENABLED.key -> "true" + ) { + withTempDir { tableDir => + val df = Seq((1, 0), (2, 1)).toDF("key", "value") + df.write.format("delta").mode("append").save(tableDir.getCanonicalPath) + + val deltaLog = DeltaLog.forTable(spark, tableDir) + val preCommitSnapshot = deltaLog.update() + assert(!preCommitSnapshot.stateReconstructionTriggered) + + df.write.format("delta").mode("append").save(tableDir.getCanonicalPath) + + val postCommitSnapshot = deltaLog.update() + assert(!preCommitSnapshot.stateReconstructionTriggered) + assert(!postCommitSnapshot.stateReconstructionTriggered) + } } }