Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark] Pass table catalogs throughout DeltaLog #3863

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.apache.spark.TaskContext
import org.apache.spark.internal.MDC
import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
Expand Down Expand Up @@ -301,14 +300,13 @@ trait Checkpoints extends DeltaLogging {
*/
def checkpoint(
snapshotToCheckpoint: Snapshot,
catalogTable: Option[CatalogTable] = None): Unit =
catalogTableOpt: Option[CatalogTable] = None): Unit =
recordDeltaOperation(this, "delta.checkpoint") {
withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") {
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
val tableIdentifierOpt = catalogTable.map(_.identifier)
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, tableIdentifierOpt)
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, catalogTableOpt)
}
}

Expand All @@ -329,8 +327,8 @@ trait Checkpoints extends DeltaLogging {

def checkpointAndCleanUpDeltaLog(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, tableIdentifierOpt)
catalogTableOpt: Option[CatalogTable] = None): Unit = {
val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, catalogTableOpt)
writeLastCheckpointFile(
snapshotToCheckpoint.deltaLog, lastCheckpointInfo, LastCheckpointInfo.checksumEnabled(spark))
doLogCleanup(snapshotToCheckpoint)
Expand All @@ -354,7 +352,7 @@ trait Checkpoints extends DeltaLogging {

protected def writeCheckpointFiles(
snapshotToCheckpoint: Snapshot,
tableIdentifierOpt: Option[TableIdentifier] = None): LastCheckpointInfo = {
catalogTableOpt: Option[CatalogTable] = None): LastCheckpointInfo = {
// With Coordinated-Commits, commit files are not guaranteed to be backfilled immediately in the
// _delta_log dir. While it is possible to compute a checkpoint file without backfilling,
// writing the checkpoint file in the log directory before backfilling the relevant commits
Expand All @@ -369,7 +367,7 @@ trait Checkpoints extends DeltaLogging {
// 00015.json
// 00016.json
// 00018.checkpoint.parquet
snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt)
snapshotToCheckpoint.ensureCommitFilesBackfilled(catalogTableOpt)
Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint)
}

Expand Down
35 changes: 19 additions & 16 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,16 @@ import org.apache.spark.util._
* @param options Filesystem options filtered from `allOptions`.
* @param allOptions All options provided by the user, for example via `df.write.option()`. This
* includes but not limited to filesystem and table properties.
* @param initialTableIdentifier Identifier of the table when the log is initialized.
* @param clock Clock to be used when starting a new transaction.
* @param initialCatalogTable The catalog table given when the log is initialized.
*/
class DeltaLog private(
val logPath: Path,
val dataPath: Path,
val options: Map[String, String],
val allOptions: Map[String, String],
val initialTableIdentifier: Option[TableIdentifier],
val clock: Clock
val clock: Clock,
initialCatalogTable: Option[CatalogTable]
) extends Checkpoints
with MetadataCleanup
with LogStoreProvider
Expand Down Expand Up @@ -127,6 +127,9 @@ class DeltaLog private(
lazy val history = new DeltaHistoryManager(
this, spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_HISTORY_PAR_SEARCH_THRESHOLD))

/** Initialize the variables in SnapshotManagement. */
createSnapshotAtInit(initialCatalogTable)

/* --------------- *
| Configuration |
* --------------- */
Expand Down Expand Up @@ -765,23 +768,23 @@ object DeltaLog extends DeltaLogging {
spark,
logPathFor(dataPath),
options = Map.empty,
initialTableIdentifier = None,
initialCatalogTable = None,
new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: Path): DeltaLog = {
apply(spark, logPathFor(dataPath), initialTableIdentifier = None, new SystemClock)
apply(spark, logPathFor(dataPath), initialCatalogTable = None, new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: Path, options: Map[String, String]): DeltaLog = {
apply(spark, logPathFor(dataPath), options, initialTableIdentifier = None, new SystemClock)
apply(spark, logPathFor(dataPath), options, initialCatalogTable = None, new SystemClock)
}

/** Helper for creating a log when it stored at the root of the data. */
def forTable(spark: SparkSession, dataPath: Path, clock: Clock): DeltaLog = {
apply(spark, logPathFor(dataPath), initialTableIdentifier = None, clock)
apply(spark, logPathFor(dataPath), initialCatalogTable = None, clock)
}

/** Helper for creating a log for the table. */
Expand Down Expand Up @@ -809,21 +812,21 @@ object DeltaLog extends DeltaLogging {
spark,
logPathFor(new Path(table.location)),
options,
Some(table.identifier),
Some(table),
new SystemClock)
}

/** Helper for creating a log for the table. */
def forTable(spark: SparkSession, table: CatalogTable, clock: Clock): DeltaLog = {
apply(spark, logPathFor(new Path(table.location)), Some(table.identifier), clock)
apply(spark, logPathFor(new Path(table.location)), Some(table), clock)
}

private def apply(
spark: SparkSession,
rawPath: Path,
initialTableIdentifier: Option[TableIdentifier],
initialCatalogTable: Option[CatalogTable],
clock: Clock): DeltaLog =
apply(spark, rawPath, options = Map.empty, initialTableIdentifier, clock)
apply(spark, rawPath, options = Map.empty, initialCatalogTable, clock)


/** Helper for getting a log, as well as the latest snapshot, of the table */
Expand Down Expand Up @@ -859,7 +862,7 @@ object DeltaLog extends DeltaLogging {
options: Map[String, String]): (DeltaLog, Snapshot) =
withFreshSnapshot { clock =>
val deltaLog =
apply(spark, logPathFor(dataPath), options, initialTableIdentifier = None, clock)
apply(spark, logPathFor(dataPath), options, initialCatalogTable = None, clock)
(deltaLog, None)
}

Expand All @@ -870,7 +873,7 @@ object DeltaLog extends DeltaLogging {
options: Map[String, String]): (DeltaLog, Snapshot) =
withFreshSnapshot { clock =>
val deltaLog =
apply(spark, logPathFor(new Path(table.location)), options, Some(table.identifier), clock)
apply(spark, logPathFor(new Path(table.location)), options, Some(table), clock)
(deltaLog, Some(table))
}

Expand All @@ -893,7 +896,7 @@ object DeltaLog extends DeltaLogging {
spark: SparkSession,
rawPath: Path,
options: Map[String, String],
initialTableIdentifier: Option[TableIdentifier],
initialCatalogTable: Option[CatalogTable],
clock: Clock
): DeltaLog = {
val fileSystemOptions: Map[String, String] =
Expand Down Expand Up @@ -922,8 +925,8 @@ object DeltaLog extends DeltaLogging {
dataPath = path.getParent,
options = fileSystemOptions,
allOptions = options,
initialTableIdentifier = initialTableIdentifier,
clock = clock
clock = clock,
initialCatalogTable = initialCatalogTable
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commit,
newChecksumOpt = None,
preCommitLogSegment = preCommitLogSegment,
catalogTable.map(_.identifier))
catalogTable)
if (currentSnapshot.version != attemptVersion) {
throw DeltaErrors.invalidCommittedVersion(attemptVersion, currentSnapshot.version)
}
Expand Down Expand Up @@ -2313,7 +2313,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
commit,
newChecksumOpt,
preCommitLogSegment,
catalogTable.map(_.identifier))
catalogTable)
val postCommitReconstructionTime = System.nanoTime()

// Post stats
Expand Down Expand Up @@ -2639,7 +2639,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val (newPreCommitLogSegment, newCommitFileStatuses) = deltaLog.getUpdatedLogSegment(
preCommitLogSegment,
readSnapshotTableCommitCoordinatorClientOpt,
catalogTable.map(_.identifier))
catalogTable)
assert(preCommitLogSegment.version + newCommitFileStatuses.size ==
newPreCommitLogSegment.version)
preCommitLogSegment = newPreCommitLogSegment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.internal.{MDC, MessageWithContext}
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -576,15 +576,15 @@ class Snapshot(
* @throws IllegalStateException
* if the delta file for the current version is not found after backfilling.
*/
def ensureCommitFilesBackfilled(tableIdentifierOpt: Option[TableIdentifier]): Unit = {
def ensureCommitFilesBackfilled(catalogTableOpt: Option[CatalogTable]): Unit = {
val tableCommitCoordinatorClient = getTableCommitCoordinatorForWrites.getOrElse {
return
}
val minUnbackfilledVersion = DeltaCommitFileProvider(this).minUnbackfilledVersion
if (minUnbackfilledVersion <= version) {
val hadoopConf = deltaLog.newDeltaHadoopConf()
tableCommitCoordinatorClient.backfillToVersion(
tableIdentifierOpt,
catalogTableOpt.map(_.identifier),
version,
lastKnownBackfilledVersion = Some(minUnbackfilledVersion - 1))
val fs = deltaLog.logPath.getFileSystem(hadoopConf)
Expand Down
Loading
Loading