Skip to content

Commit

Permalink
Passing TableIdentifier from CheckpointHook to commit coordinator client
Browse files Browse the repository at this point in the history
  • Loading branch information
ctringdb committed Sep 19, 2024
1 parent 80457b0 commit 48b68a8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
22 changes: 13 additions & 9 deletions spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.ColumnImplicitsShim._
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -298,13 +299,15 @@ trait Checkpoints extends DeltaLogging {
* Note that this function captures and logs all exceptions, since the checkpoint shouldn't fail
* the overall commit operation.
*/
def checkpoint(snapshotToCheckpoint: Snapshot): Unit = recordDeltaOperation(
this, "delta.checkpoint") {
def checkpoint(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): Unit =
recordDeltaOperation(this, "delta.checkpoint") {
withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") {
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint)
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, tableIdentifierOpt)
}
}

Expand All @@ -324,8 +327,9 @@ trait Checkpoints extends DeltaLogging {
}

def checkpointAndCleanUpDeltaLog(
snapshotToCheckpoint: Snapshot): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint)
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, tableIdentifierOpt)
writeLastCheckpointFile(
snapshotToCheckpoint.deltaLog, lastCheckpointInfo, LastCheckpointInfo.checksumEnabled(spark))
doLogCleanup(snapshotToCheckpoint)
Expand All @@ -347,7 +351,9 @@ trait Checkpoints extends DeltaLogging {
}
}

protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): LastCheckpointInfo = {
protected def writeCheckpointFiles(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): LastCheckpointInfo = {
// With Coordinated-Commits, commit files are not guaranteed to be backfilled immediately in the
// _delta_log dir. While it is possible to compute a checkpoint file without backfilling,
// writing the checkpoint file in the log directory before backfilling the relevant commits
Expand All @@ -362,9 +368,7 @@ trait Checkpoints extends DeltaLogging {
// 00015.json
// 00016.json
// 00018.checkpoint.parquet
// TODO(table-identifier-plumbing): Plumb the right tableIdentifier from the Checkpoint Hook
// and pass it to `ensureCommitFilesBackfilled`.
snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt = None)
snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt)
Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ object CheckpointHook extends PostCommitHook {
committedVersion,
lastCheckpointHint = None,
lastCheckpointProvider = Some(cp))
txn.deltaLog.checkpoint(snapshotToCheckpoint)
txn.deltaLog.checkpoint(snapshotToCheckpoint, txn.catalogTable.map(_.identifier))
}
}

0 comments on commit 48b68a8

Please sign in to comment.