Skip to content

Commit

Permalink
[Spark] Pass table catalogs throughout DeltaLog (#3863)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->

Previously, we passed the table identifier to the DeltaLog's constructor
and methods to use. For future-proof purpose, we should pass the whole
catalog table instead.


## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Unit test
## Does this PR introduce _any_ user-facing changes?
No
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
  • Loading branch information
ctring authored Nov 12, 2024
1 parent 7a8fdd3 commit 4f54313
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
Expand Down Expand Up @@ -301,14 +300,13 @@ trait Checkpoints extends DeltaLogging {
*/
def checkpoint(
snapshotToCheckpoint: Snapshot,
catalogTable: Option[CatalogTable] = None): Unit =
catalogTableOpt: Option[CatalogTable] = None): Unit =
recordDeltaOperation(this, "delta.checkpoint") {
withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") {
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
val tableIdentifierOpt = catalogTable.map(_.identifier)
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, tableIdentifierOpt)
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, catalogTableOpt)
}
}

Expand All @@ -329,8 +327,8 @@ trait Checkpoints extends DeltaLogging {

def checkpointAndCleanUpDeltaLog(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, tableIdentifierOpt)
catalogTableOpt: Option[CatalogTable] = None): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, catalogTableOpt)
writeLastCheckpointFile(
snapshotToCheckpoint.deltaLog, lastCheckpointInfo, LastCheckpointInfo.checksumEnabled(spark))
doLogCleanup(snapshotToCheckpoint)
Expand All @@ -354,7 +352,7 @@ trait Checkpoints extends DeltaLogging {

protected def writeCheckpointFiles(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): LastCheckpointInfo = {
catalogTableOpt: Option[CatalogTable] = None): LastCheckpointInfo = {
// With Coordinated-Commits, commit files are not guaranteed to be backfilled immediately in the
// _delta_log dir. While it is possible to compute a checkpoint file without backfilling,
// writing the checkpoint file in the log directory before backfilling the relevant commits
Expand All @@ -369,7 +367,7 @@ trait Checkpoints extends DeltaLogging {
// 00015.json
// 00016.json
// 00018.checkpoint.parquet
snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt)
snapshotToCheckpoint.ensureCommitFilesBackfilled(catalogTableOpt)
Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint)
}

Expand Down
35 changes: 19 additions & 16 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ import org.apache.spark.util._
* @param options Filesystem options filtered from `allOptions`.
* @param allOptions All options provided by the user, for example via `df.write.option()`. This
* includes but not limited to filesystem and table properties.
* @param initialTableIdentifier Identifier of the table when the log is initialized.
* @param clock Clock to be used when starting a new transaction.
* @param initialCatalogTable The catalog table given when the log is initialized.
*/
class DeltaLog private(
val logPath: Path,
val dataPath: Path,
val options: Map[String, String],
val allOptions: Map[String, String],
val initialTableIdentifier: Option[TableIdentifier],
val clock: Clock
val clock: Clock,
initialCatalogTable: Option[CatalogTable]
) extends Checkpoints
with MetadataCleanup
with LogStoreProvider
Expand Down Expand Up @@ -127,6 +127,9 @@ class DeltaLog private(
lazy val history = new DeltaHistoryManager(
this, spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_HISTORY_PAR_SEARCH_THRESHOLD))

/** Initialize the variables in SnapshotManagement. */
createSnapshotAtInit(initialCatalogTable)

/* --------------- *
| Configuration |
* --------------- */
Expand Down Expand Up @@ -765,23 +768,23 @@ object DeltaLog extends DeltaLogging {
spark,
logPathFor(dataPath),
options = Map.empty,
initialTableIdentifier = None,
initialCatalogTable = None,
new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: Path): DeltaLog = {
apply(spark, logPathFor(dataPath), initialTableIdentifier = None, new SystemClock)
apply(spark, logPathFor(dataPath), initialCatalogTable = None, new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: Path, options: Map[String, String]): DeltaLog = {
apply(spark, logPathFor(dataPath), options, initialTableIdentifier = None, new SystemClock)
apply(spark, logPathFor(dataPath), options, initialCatalogTable = None, new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: Path, clock: Clock): DeltaLog = {
apply(spark, logPathFor(dataPath), initialTableIdentifier = None, clock)
apply(spark, logPathFor(dataPath), initialCatalogTable = None, clock)
}

/** Helper for creating a log for the table. */
Expand Down Expand Up @@ -809,21 +812,21 @@ object DeltaLog extends DeltaLogging {
spark,
logPathFor(new Path(table.location)),
options,
Some(table.identifier),
Some(table),
new SystemClock)
}

/** Helper for creating a log for the table. */
def forTable(spark: SparkSession, table: CatalogTable, clock: Clock): DeltaLog = {
apply(spark, logPathFor(new Path(table.location)), Some(table.identifier), clock)
apply(spark, logPathFor(new Path(table.location)), Some(table), clock)
}

private def apply(
spark: SparkSession,
rawPath: Path,
initialTableIdentifier: Option[TableIdentifier],
initialCatalogTable: Option[CatalogTable],
clock: Clock): DeltaLog =
apply(spark, rawPath, options = Map.empty, initialTableIdentifier, clock)
apply(spark, rawPath, options = Map.empty, initialCatalogTable, clock)


/** Helper for getting a log, as well as the latest snapshot, of the table */
Expand Down Expand Up @@ -859,7 +862,7 @@ object DeltaLog extends DeltaLogging {
options: Map[String, String]): (DeltaLog, Snapshot) =
withFreshSnapshot { clock =>
val deltaLog =
apply(spark, logPathFor(dataPath), options, initialTableIdentifier = None, clock)
apply(spark, logPathFor(dataPath), options, initialCatalogTable = None, clock)
(deltaLog, None)
}

Expand All @@ -870,7 +873,7 @@ object DeltaLog extends DeltaLogging {
options: Map[String, String]): (DeltaLog, Snapshot) =
withFreshSnapshot { clock =>
val deltaLog =
apply(spark, logPathFor(new Path(table.location)), options, Some(table.identifier), clock)
apply(spark, logPathFor(new Path(table.location)), options, Some(table), clock)
(deltaLog, Some(table))
}

Expand All @@ -893,7 +896,7 @@ object DeltaLog extends DeltaLogging {
spark: SparkSession,
rawPath: Path,
options: Map[String, String],
initialTableIdentifier: Option[TableIdentifier],
initialCatalogTable: Option[CatalogTable],
clock: Clock
): DeltaLog = {
val fileSystemOptions: Map[String, String] =
Expand Down Expand Up @@ -922,8 +925,8 @@ object DeltaLog extends DeltaLogging {
dataPath = path.getParent,
options = fileSystemOptions,
allOptions = options,
initialTableIdentifier = initialTableIdentifier,
clock = clock
clock = clock,
initialCatalogTable = initialCatalogTable
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commit,
newChecksumOpt = None,
preCommitLogSegment = preCommitLogSegment,
catalogTable.map(_.identifier))
catalogTable)
if (currentSnapshot.version != attemptVersion) {
throw DeltaErrors.invalidCommittedVersion(attemptVersion, currentSnapshot.version)
}
Expand Down Expand Up @@ -2313,7 +2313,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commit,
newChecksumOpt,
preCommitLogSegment,
catalogTable.map(_.identifier))
catalogTable)
val postCommitReconstructionTime = System.nanoTime()

// Post stats
Expand Down Expand Up @@ -2639,7 +2639,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val (newPreCommitLogSegment, newCommitFileStatuses) = deltaLog.getUpdatedLogSegment(
preCommitLogSegment,
readSnapshotTableCommitCoordinatorClientOpt,
catalogTable.map(_.identifier))
catalogTable)
assert(preCommitLogSegment.version + newCommitFileStatuses.size ==
newPreCommitLogSegment.version)
preCommitLogSegment = newPreCommitLogSegment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -576,15 +576,15 @@ class Snapshot(
* @throws IllegalStateException
* if the delta file for the current version is not found after backfilling.
*/
def ensureCommitFilesBackfilled(tableIdentifierOpt: Option[TableIdentifier]): Unit = {
def ensureCommitFilesBackfilled(catalogTableOpt: Option[CatalogTable]): Unit = {
val tableCommitCoordinatorClient = getTableCommitCoordinatorForWrites.getOrElse {
return
}
val minUnbackfilledVersion = DeltaCommitFileProvider(this).minUnbackfilledVersion
if (minUnbackfilledVersion <= version) {
val hadoopConf = deltaLog.newDeltaHadoopConf()
tableCommitCoordinatorClient.backfillToVersion(
tableIdentifierOpt,
catalogTableOpt.map(_.identifier),
version,
lastKnownBackfilledVersion = Some(minUnbackfilledVersion - 1))
val fs = deltaLog.logPath.getFileSystem(hadoopConf)
Expand Down
Loading

0 comments on commit 4f54313

Please sign in to comment.