diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index a70c98e64b..cbf8f81368 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -42,6 +42,7 @@ import org.apache.spark.internal.MDC import org.apache.spark.paths.SparkPath import org.apache.spark.sql.ColumnImplicitsShim._ import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal} import org.apache.spark.sql.execution.SQLExecution @@ -298,13 +299,15 @@ trait Checkpoints extends DeltaLogging { * Note that this function captures and logs all exceptions, since the checkpoint shouldn't fail * the overall commit operation. */ - def checkpoint(snapshotToCheckpoint: Snapshot): Unit = recordDeltaOperation( - this, "delta.checkpoint") { + def checkpoint( + snapshotToCheckpoint: Snapshot, + tableIdentifierOpt: Option[TableIdentifier] = None): Unit = + recordDeltaOperation(this, "delta.checkpoint") { withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") { if (snapshotToCheckpoint.version < 0) { throw DeltaErrors.checkpointNonExistTable(dataPath) } - checkpointAndCleanUpDeltaLog(snapshotToCheckpoint) + checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, tableIdentifierOpt) } } @@ -324,8 +327,9 @@ trait Checkpoints extends DeltaLogging { } def checkpointAndCleanUpDeltaLog( - snapshotToCheckpoint: Snapshot): Unit = { - val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint) + snapshotToCheckpoint: Snapshot, + tableIdentifierOpt: Option[TableIdentifier] = None): Unit = { + val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, tableIdentifierOpt) writeLastCheckpointFile( snapshotToCheckpoint.deltaLog, lastCheckpointInfo, LastCheckpointInfo.checksumEnabled(spark)) doLogCleanup(snapshotToCheckpoint) @@ -347,7 +351,9 @@ trait Checkpoints extends DeltaLogging { } } - protected def writeCheckpointFiles(snapshotToCheckpoint: Snapshot): LastCheckpointInfo = { + protected def writeCheckpointFiles( + snapshotToCheckpoint: Snapshot, + tableIdentifierOpt: Option[TableIdentifier] = None): LastCheckpointInfo = { // With Coordinated-Commits, commit files are not guaranteed to be backfilled immediately in the // _delta_log dir. While it is possible to compute a checkpoint file without backfilling, // writing the checkpoint file in the log directory before backfilling the relevant commits @@ -362,9 +368,7 @@ trait Checkpoints extends DeltaLogging { // 00015.json // 00016.json // 00018.checkpoint.parquet - // TODO(table-identifier-plumbing): Plumb the right tableIdentifier from the Checkpoint Hook - // and pass it to `ensureCommitFilesBackfilled`. - snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt = None) + snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt) Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala index e6e99c5b01..833df53199 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/hooks/CheckpointHook.scala @@ -40,6 +40,6 @@ object CheckpointHook extends PostCommitHook { committedVersion, lastCheckpointHint = None, lastCheckpointProvider = Some(cp)) - txn.deltaLog.checkpoint(snapshotToCheckpoint) + txn.deltaLog.checkpoint(snapshotToCheckpoint, txn.catalogTable.map(_.identifier)) } }