From 21458ba970b7e5d06a24b832840101f41eccff50 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Tue, 12 Nov 2024 09:37:01 -0800 Subject: [PATCH] Pass table catalogs throughout DeltaLog --- .../apache/spark/sql/delta/Checkpoints.scala | 14 +- .../org/apache/spark/sql/delta/DeltaLog.scala | 35 ++-- .../sql/delta/OptimisticTransaction.scala | 6 +- .../org/apache/spark/sql/delta/Snapshot.scala | 6 +- .../spark/sql/delta/SnapshotManagement.scala | 150 +++++++++--------- .../CoordinatedCommitsUtils.scala | 8 +- .../sql/delta/SnapshotManagementSuite.scala | 4 +- .../sql/delta/test/DeltaTestImplicits.scala | 2 +- 8 files changed, 114 insertions(+), 111 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 14739343a3..450645f3b6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -41,7 +41,6 @@ import org.apache.spark.TaskContext import org.apache.spark.internal.MDC import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{Column, DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal} @@ -301,14 +300,13 @@ trait Checkpoints extends DeltaLogging { */ def checkpoint( snapshotToCheckpoint: Snapshot, - catalogTable: Option[CatalogTable] = None): Unit = + catalogTableOpt: Option[CatalogTable] = None): Unit = recordDeltaOperation(this, "delta.checkpoint") { withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") { if (snapshotToCheckpoint.version < 0) { throw DeltaErrors.checkpointNonExistTable(dataPath) } - val tableIdentifierOpt = catalogTable.map(_.identifier) - checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, tableIdentifierOpt) + checkpointAndCleanUpDeltaLog(snapshotToCheckpoint, catalogTableOpt) } } @@ -329,8 +327,8 @@ trait Checkpoints extends DeltaLogging { def checkpointAndCleanUpDeltaLog( snapshotToCheckpoint: Snapshot, - tableIdentifierOpt: Option[TableIdentifier] = None): Unit = { - val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, tableIdentifierOpt) + catalogTableOpt: Option[CatalogTable] = None): Unit = { + val lastCheckpointInfo = writeCheckpointFiles(snapshotToCheckpoint, catalogTableOpt) writeLastCheckpointFile( snapshotToCheckpoint.deltaLog, lastCheckpointInfo, LastCheckpointInfo.checksumEnabled(spark)) doLogCleanup(snapshotToCheckpoint) @@ -354,7 +352,7 @@ trait Checkpoints extends DeltaLogging { protected def writeCheckpointFiles( snapshotToCheckpoint: Snapshot, - tableIdentifierOpt: Option[TableIdentifier] = None): LastCheckpointInfo = { + catalogTableOpt: Option[CatalogTable] = None): LastCheckpointInfo = { // With Coordinated-Commits, commit files are not guaranteed to be backfilled immediately in the // _delta_log dir. While it is possible to compute a checkpoint file without backfilling, // writing the checkpoint file in the log directory before backfilling the relevant commits @@ -369,7 +367,7 @@ trait Checkpoints extends DeltaLogging { // 00015.json // 00016.json // 00018.checkpoint.parquet - snapshotToCheckpoint.ensureCommitFilesBackfilled(tableIdentifierOpt) + snapshotToCheckpoint.ensureCommitFilesBackfilled(catalogTableOpt) Checkpoints.writeCheckpoint(spark, this, snapshotToCheckpoint) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index c8f1b5c9b0..b8f8c9baa3 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -69,16 +69,16 @@ import org.apache.spark.util._ * @param options Filesystem options filtered from `allOptions`. * @param allOptions All options provided by the user, for example via `df.write.option()`. This * includes but not limited to filesystem and table properties. - * @param initialTableIdentifier Identifier of the table when the log is initialized. * @param clock Clock to be used when starting a new transaction. + * @param initialCatalogTable The catalog table given when the log is initialized. */ class DeltaLog private( val logPath: Path, val dataPath: Path, val options: Map[String, String], val allOptions: Map[String, String], - val initialTableIdentifier: Option[TableIdentifier], - val clock: Clock + val clock: Clock, + initialCatalogTable: Option[CatalogTable] ) extends Checkpoints with MetadataCleanup with LogStoreProvider @@ -127,6 +127,9 @@ class DeltaLog private( lazy val history = new DeltaHistoryManager( this, spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_HISTORY_PAR_SEARCH_THRESHOLD)) + /** Initialize the variables in SnapshotManagement. */ + createSnapshotAtInit(initialCatalogTable) + /* --------------- * | Configuration | * --------------- */ @@ -765,23 +768,23 @@ object DeltaLog extends DeltaLogging { spark, logPathFor(dataPath), options = Map.empty, - initialTableIdentifier = None, + initialCatalogTable = None, new SystemClock) } /** Helper for creating a log when it stored at the root of the data. */ def forTable(spark: SparkSession, dataPath: Path): DeltaLog = { - apply(spark, logPathFor(dataPath), initialTableIdentifier = None, new SystemClock) + apply(spark, logPathFor(dataPath), initialCatalogTable = None, new SystemClock) } /** Helper for creating a log when it stored at the root of the data. */ def forTable(spark: SparkSession, dataPath: Path, options: Map[String, String]): DeltaLog = { - apply(spark, logPathFor(dataPath), options, initialTableIdentifier = None, new SystemClock) + apply(spark, logPathFor(dataPath), options, initialCatalogTable = None, new SystemClock) } /** Helper for creating a log when it stored at the root of the data. */ def forTable(spark: SparkSession, dataPath: Path, clock: Clock): DeltaLog = { - apply(spark, logPathFor(dataPath), initialTableIdentifier = None, clock) + apply(spark, logPathFor(dataPath), initialCatalogTable = None, clock) } /** Helper for creating a log for the table. */ @@ -809,21 +812,21 @@ object DeltaLog extends DeltaLogging { spark, logPathFor(new Path(table.location)), options, - Some(table.identifier), + Some(table), new SystemClock) } /** Helper for creating a log for the table. */ def forTable(spark: SparkSession, table: CatalogTable, clock: Clock): DeltaLog = { - apply(spark, logPathFor(new Path(table.location)), Some(table.identifier), clock) + apply(spark, logPathFor(new Path(table.location)), Some(table), clock) } private def apply( spark: SparkSession, rawPath: Path, - initialTableIdentifier: Option[TableIdentifier], + initialCatalogTable: Option[CatalogTable], clock: Clock): DeltaLog = - apply(spark, rawPath, options = Map.empty, initialTableIdentifier, clock) + apply(spark, rawPath, options = Map.empty, initialCatalogTable, clock) /** Helper for getting a log, as well as the latest snapshot, of the table */ @@ -859,7 +862,7 @@ object DeltaLog extends DeltaLogging { options: Map[String, String]): (DeltaLog, Snapshot) = withFreshSnapshot { clock => val deltaLog = - apply(spark, logPathFor(dataPath), options, initialTableIdentifier = None, clock) + apply(spark, logPathFor(dataPath), options, initialCatalogTable = None, clock) (deltaLog, None) } @@ -870,7 +873,7 @@ object DeltaLog extends DeltaLogging { options: Map[String, String]): (DeltaLog, Snapshot) = withFreshSnapshot { clock => val deltaLog = - apply(spark, logPathFor(new Path(table.location)), options, Some(table.identifier), clock) + apply(spark, logPathFor(new Path(table.location)), options, Some(table), clock) (deltaLog, Some(table)) } @@ -893,7 +896,7 @@ object DeltaLog extends DeltaLogging { spark: SparkSession, rawPath: Path, options: Map[String, String], - initialTableIdentifier: Option[TableIdentifier], + initialCatalogTable: Option[CatalogTable], clock: Clock ): DeltaLog = { val fileSystemOptions: Map[String, String] = @@ -922,8 +925,8 @@ object DeltaLog extends DeltaLogging { dataPath = path.getParent, options = fileSystemOptions, allOptions = options, - initialTableIdentifier = initialTableIdentifier, - clock = clock + clock = clock, + initialCatalogTable = initialCatalogTable ) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index c27de3010d..1ffdd75a85 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -1869,7 +1869,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite commit, newChecksumOpt = None, preCommitLogSegment = preCommitLogSegment, - catalogTable.map(_.identifier)) + catalogTable) if (currentSnapshot.version != attemptVersion) { throw DeltaErrors.invalidCommittedVersion(attemptVersion, currentSnapshot.version) } @@ -2313,7 +2313,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite commit, newChecksumOpt, preCommitLogSegment, - catalogTable.map(_.identifier)) + catalogTable) val postCommitReconstructionTime = System.nanoTime() // Post stats @@ -2639,7 +2639,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite val (newPreCommitLogSegment, newCommitFileStatuses) = deltaLog.getUpdatedLogSegment( preCommitLogSegment, readSnapshotTableCommitCoordinatorClientOpt, - catalogTable.map(_.identifier)) + catalogTable) assert(preCommitLogSegment.version + newCommitFileStatuses.size == newPreCommitLogSegment.version) preCommitLogSegment = newPreCommitLogSegment diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala index df4e1a6079..2b85b2c73d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/Snapshot.scala @@ -38,7 +38,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.{MDC, MessageWithContext} import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -576,7 +576,7 @@ class Snapshot( * @throws IllegalStateException * if the delta file for the current version is not found after backfilling. */ - def ensureCommitFilesBackfilled(tableIdentifierOpt: Option[TableIdentifier]): Unit = { + def ensureCommitFilesBackfilled(catalogTableOpt: Option[CatalogTable]): Unit = { val tableCommitCoordinatorClient = getTableCommitCoordinatorForWrites.getOrElse { return } @@ -584,7 +584,7 @@ class Snapshot( if (minUnbackfilledVersion <= version) { val hadoopConf = deltaLog.newDeltaHadoopConf() tableCommitCoordinatorClient.backfillToVersion( - tableIdentifierOpt, + catalogTableOpt.map(_.identifier), version, lastKnownBackfilledVersion = Some(minUnbackfilledVersion - 1)) val fs = deltaLog.logPath.getFileSystem(hadoopConf) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index ab0a3cc0a2..3340422f05 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -71,8 +71,10 @@ trait SnapshotManagement { self: DeltaLog => * Cached fileStatus for the latest CRC file seen in the deltaLog. */ @volatile protected var lastSeenChecksumFileStatusOpt: Option[FileStatus] = None - @volatile protected var currentSnapshot: CapturedSnapshot = getSnapshotAtInit - + /** + * Cached latest snapshot. This is initialized in `createSnapshotAtInit` + */ + @volatile protected var currentSnapshot: CapturedSnapshot = _ /** * Run `body` inside `snapshotLock` lock using `lockInterruptibly` so that the thread * can be interrupted when waiting for the lock. @@ -157,8 +159,7 @@ trait SnapshotManagement { self: DeltaLog => * @param startVersion the version to start. Inclusive. * @param tableCommitCoordinatorClientOpt the optional commit coordinator to use for fetching * un-backfilled commits. - * @param tableIdentifierOpt the optional table identifier to pass to the commit coordinator - * client. + * @param catalogTableOpt the optional catalog table to pass to the commit coordinator client. * @param versionToLoad the optional parameter to set the max version we should return. Inclusive. * @param includeMinorCompactions Whether to include minor compaction files in the result * @return A tuple where the first element is an array of log files (possibly empty, if no @@ -168,7 +169,7 @@ trait SnapshotManagement { self: DeltaLog => protected def listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile( startVersion: Long, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], versionToLoad: Option[Long], includeMinorCompactions: Boolean): (Option[Array[FileStatus]], Option[FileStatus]) = { val tableCommitCoordinatorClient = tableCommitCoordinatorClientOpt.getOrElse { @@ -181,7 +182,7 @@ trait SnapshotManagement { self: DeltaLog => val threadPool = SnapshotManagement.commitCoordinatorGetCommitsThreadPool def getCommitsTask(isAsyncRequest: Boolean): GetCommitsResponse = { CoordinatedCommitsUtils.getCommitsFromCommitCoordinatorWithUsageLogs( - this, tableCommitCoordinatorClient, tableIdentifierOpt, + this, tableCommitCoordinatorClient, catalogTableOpt, startVersion, versionToLoad, isAsyncRequest) } def getGetCommitsResponseFuture(): Future[GetCommitsResponse] = { @@ -318,8 +319,7 @@ trait SnapshotManagement { self: DeltaLog => * @param startVersion the version to start. Inclusive. * @param tableCommitCoordinatorClientOpt the optional commit-coordinator client to use for * fetching un-backfilled commits. - * @param tableIdentifierOpt the optional table identifier to pass to the commit coordinator - * client. + * @param catalogTableOpt the optional catalog table to pass to the commit coordinator client. * @param versionToLoad the optional parameter to set the max version we should return. Inclusive. * @param includeMinorCompactions Whether to include minor compaction files in the result * @return Some array of files found (possibly empty, if no usable commit files are present), or @@ -328,7 +328,7 @@ trait SnapshotManagement { self: DeltaLog => protected final def listDeltaCompactedDeltaAndCheckpointFiles( startVersion: Long, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], versionToLoad: Option[Long], includeMinorCompactions: Boolean): Option[Array[FileStatus]] = { recordDeltaOperation(self, "delta.deltaLog.listDeltaAndCheckpointFiles") { @@ -336,7 +336,7 @@ trait SnapshotManagement { self: DeltaLog => listDeltaCompactedDeltaCheckpointFilesAndLatestChecksumFile( startVersion, tableCommitCoordinatorClientOpt, - tableIdentifierOpt, + catalogTableOpt, versionToLoad, includeMinorCompactions) lastSeenChecksumFileStatusOpt = latestChecksumOpt @@ -358,8 +358,7 @@ trait SnapshotManagement { self: DeltaLog => * unavailable. This is also used to initialize the [[LogSegment]]. * @param tableCommitCoordinatorClientOpt the optional commit-coordinator client to use for * fetching un-backfilled commits. - * @param tableIdentifierOpt the optional table identifier to pass to the commit coordinator - * client. + * @param catalogTableOpt the optional catalog table to pass to the commit coordinator client. * @param lastCheckpointInfo [[LastCheckpointInfo]] from the _last_checkpoint. This could be * used to initialize the Snapshot's [[LogSegment]]. * @return Some LogSegment to build a Snapshot if files do exist after the given @@ -369,7 +368,7 @@ trait SnapshotManagement { self: DeltaLog => versionToLoad: Option[Long] = None, oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider] = None, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient] = None, - tableIdentifierOpt: Option[TableIdentifier] = None, + catalogTableOpt: Option[CatalogTable] = None, lastCheckpointInfo: Option[LastCheckpointInfo] = None): Option[LogSegment] = { // List based on the last known checkpoint version. // if that is -1, list from version 0L @@ -380,7 +379,7 @@ trait SnapshotManagement { self: DeltaLog => val newFiles = listDeltaCompactedDeltaAndCheckpointFiles( listingStartVersion, tableCommitCoordinatorClientOpt, - tableIdentifierOpt, + catalogTableOpt, versionToLoad, includeMinorCompactions) getLogSegmentForVersion( @@ -389,18 +388,18 @@ trait SnapshotManagement { self: DeltaLog => validateLogSegmentWithoutCompactedDeltas = true, oldCheckpointProviderOpt = oldCheckpointProviderOpt, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, lastCheckpointInfo = lastCheckpointInfo ) } private def createLogSegment( previousSnapshot: Snapshot, - tableIdentifierOpt: Option[TableIdentifier]): Option[LogSegment] = { + catalogTableOpt: Option[CatalogTable]): Option[LogSegment] = { createLogSegment( oldCheckpointProviderOpt = Some(previousSnapshot.checkpointProvider), tableCommitCoordinatorClientOpt = previousSnapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt) + catalogTableOpt = catalogTableOpt) } /** @@ -463,7 +462,7 @@ trait SnapshotManagement { self: DeltaLog => files: Option[Array[FileStatus]], validateLogSegmentWithoutCompactedDeltas: Boolean, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], oldCheckpointProviderOpt: Option[UninitializedCheckpointProvider], lastCheckpointInfo: Option[LastCheckpointInfo]): Option[LogSegment] = { recordFrameProfile("Delta", "SnapshotManagement.getLogSegmentForVersion") { @@ -491,7 +490,7 @@ trait SnapshotManagement { self: DeltaLog => // singleton, so try listing from the first version return createLogSegment( versionToLoad = versionToLoad, - tableIdentifierOpt = tableIdentifierOpt) + catalogTableOpt = catalogTableOpt) } val (checkpoints, deltasAndCompactedDeltas) = newFiles.partition(isCheckpointFile) val (deltas, compactedDeltas) = deltasAndCompactedDeltas.partition(isDeltaFile) @@ -513,7 +512,7 @@ trait SnapshotManagement { self: DeltaLog => snapshotVersion, lastCheckpointVersion, tableCommitCoordinatorClientOpt, - tableIdentifierOpt + catalogTableOpt ).foreach { alternativeLogSegment => return Some(alternativeLogSegment) } @@ -664,23 +663,24 @@ trait SnapshotManagement { self: DeltaLog => * file as a hint on where to start listing the transaction log directory. If the _delta_log * directory doesn't exist, this method will return an `InitialSnapshot`. */ - protected def getSnapshotAtInit: CapturedSnapshot = withSnapshotLockInterruptibly { - recordFrameProfile("Delta", "SnapshotManagement.getSnapshotAtInit") { - val snapshotInitWallclockTime = clock.getTimeMillis() - val lastCheckpointOpt = readLastCheckpointFile() - val initialSegmentForNewSnapshot = createLogSegment( - versionToLoad = None, - tableIdentifierOpt = initialTableIdentifier, - lastCheckpointInfo = lastCheckpointOpt) - val snapshot = getUpdatedSnapshot( - oldSnapshotOpt = None, - initialSegmentForNewSnapshot = initialSegmentForNewSnapshot, - initialTableCommitCoordinatorClient = None, - tableIdentifierOpt = initialTableIdentifier, - isAsync = false) - CapturedSnapshot(snapshot, snapshotInitWallclockTime) + protected def createSnapshotAtInit(initialCatalogTable: Option[CatalogTable]): Unit = + withSnapshotLockInterruptibly { + recordFrameProfile("Delta", "SnapshotManagement.createSnapshotAtInit") { + val snapshotInitWallclockTime = clock.getTimeMillis() + val lastCheckpointOpt = readLastCheckpointFile() + val initialSegmentForNewSnapshot = createLogSegment( + versionToLoad = None, + catalogTableOpt = initialCatalogTable, + lastCheckpointInfo = lastCheckpointOpt) + val snapshot = getUpdatedSnapshot( + oldSnapshotOpt = None, + initialSegmentForNewSnapshot = initialSegmentForNewSnapshot, + initialTableCommitCoordinatorClient = None, + catalogTableOpt = initialCatalogTable, + isAsync = false) + currentSnapshot = CapturedSnapshot(snapshot, snapshotInitWallclockTime) + } } - } /** * Returns the current snapshot. This does not automatically `update()`. @@ -715,7 +715,7 @@ trait SnapshotManagement { self: DeltaLog => protected def createSnapshot( initSegment: LogSegment, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], checksumOpt: Option[VersionChecksum]): Snapshot = { val startingFrom = if (!initSegment.checkpointProvider.isEmpty) { log" starting from checkpoint version " + @@ -723,7 +723,7 @@ trait SnapshotManagement { self: DeltaLog => } else log"." logInfo(log"Loading version ${MDC(DeltaLogKeys.VERSION, initSegment.version)}" + startingFrom) createSnapshotFromGivenOrEquivalentLogSegment( - initSegment, tableCommitCoordinatorClientOpt, tableIdentifierOpt) { segment => + initSegment, tableCommitCoordinatorClientOpt, catalogTableOpt) { segment => new Snapshot( path = logPath, version = segment.version, @@ -748,7 +748,7 @@ trait SnapshotManagement { self: DeltaLog => snapshotVersion: Long, maxExclusiveCheckpointVersion: Long, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier]): Option[LogSegment] = { + catalogTableOpt: Option[CatalogTable]): Option[LogSegment] = { assert( snapshotVersion >= maxExclusiveCheckpointVersion, s"snapshotVersion($snapshotVersion) is less than " + @@ -761,7 +761,7 @@ trait SnapshotManagement { self: DeltaLog => val filesSinceCheckpointVersion = listDeltaCompactedDeltaAndCheckpointFiles( startVersion = cp.version, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, versionToLoad = Some(snapshotVersion), includeMinorCompactions = false ).getOrElse(Array.empty) @@ -804,7 +804,7 @@ trait SnapshotManagement { self: DeltaLog => listDeltaCompactedDeltaAndCheckpointFiles( startVersion = 0, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, versionToLoad = Some(snapshotVersion), includeMinorCompactions = false) val (deltas, deltaVersions) = @@ -839,7 +839,7 @@ trait SnapshotManagement { self: DeltaLog => preCommitLogSegment: LogSegment, commit: Commit, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], oldCheckpointProvider: CheckpointProvider): LogSegment = recordFrameProfile( "Delta", "SnapshotManagement.getLogSegmentAfterCommit") { // If the table doesn't have any competing updates, then go ahead and use the optimized @@ -855,14 +855,14 @@ trait SnapshotManagement { self: DeltaLog => Seq(preCommitLogSegment.checkpointProvider, oldCheckpointProvider).maxBy(_.version) getLogSegmentAfterCommit( tableCommitCoordinatorClientOpt, - tableIdentifierOpt, + catalogTableOpt, latestCheckpointProvider) } } protected[delta] def getLogSegmentAfterCommit( tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], oldCheckpointProvider: UninitializedCheckpointProvider): LogSegment = { /** * We can't specify `versionToLoad = committedVersion` for the call below. @@ -876,7 +876,7 @@ trait SnapshotManagement { self: DeltaLog => createLogSegment( oldCheckpointProviderOpt = Some(oldCheckpointProvider), tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt + catalogTableOpt = catalogTableOpt ).getOrElse { // This shouldn't be possible right after a commit logError(log"No delta log found for the Delta table at ${MDC(DeltaLogKeys.PATH, logPath)}") @@ -892,7 +892,7 @@ trait SnapshotManagement { self: DeltaLog => protected def createSnapshotFromGivenOrEquivalentLogSegment( initSegment: LogSegment, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier]) + catalogTableOpt: Option[CatalogTable]) (snapshotCreator: LogSegment => Snapshot): Snapshot = { val numRetries = spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SNAPSHOT_LOADING_MAX_RETRIES) @@ -916,7 +916,7 @@ trait SnapshotManagement { self: DeltaLog => segment.version, segment.checkpointProvider.version, tableCommitCoordinatorClientOpt, - tableIdentifierOpt).getOrElse { + catalogTableOpt).getOrElse { // Throw the first error if we cannot find an equivalent `LogSegment`. throw firstError } @@ -945,13 +945,13 @@ trait SnapshotManagement { self: DeltaLog => def getUpdatedLogSegment( oldLogSegment: LogSegment, tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier] + catalogTableOpt: Option[CatalogTable] ): (LogSegment, Seq[FileStatus]) = { val includeCompactions = spark.conf.get(DeltaSQLConf.DELTALOG_MINOR_COMPACTION_USE_FOR_READS) val newFilesOpt = listDeltaCompactedDeltaAndCheckpointFiles( startVersion = oldLogSegment.version + 1, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, versionToLoad = None, includeMinorCompactions = includeCompactions) val newFiles = newFilesOpt.getOrElse { @@ -970,7 +970,7 @@ trait SnapshotManagement { self: DeltaLog => files = Some(allFiles), validateLogSegmentWithoutCompactedDeltas = false, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, lastCheckpointInfo = lastCheckpointInfo, oldCheckpointProviderOpt = Some(oldLogSegment.checkpointProvider) ).getOrElse(oldLogSegment) @@ -1012,7 +1012,7 @@ trait SnapshotManagement { self: DeltaLog => * and can return a stale snapshot in the meantime. * @param checkIfUpdatedSinceTs Skip the update if we've already updated the snapshot since the * specified timestamp. - * @param tableIdentifierOpt The identifier of the current table. + * @param catalogTableOpt The catalog table of the current table. */ def update( stalenessAcceptable: Boolean = false, @@ -1050,7 +1050,7 @@ trait SnapshotManagement { self: DeltaLog => withSnapshotLockInterruptibly { val newSnapshot = updateInternal( isAsync = false, - catalogTableOpt.map(_.identifier)) + catalogTableOpt) sendEvent(newSnapshot = capturedSnapshot.snapshot) newSnapshot } @@ -1068,7 +1068,7 @@ trait SnapshotManagement { self: DeltaLog => interruptOnCancel = true) tryUpdate( isAsync = true, - catalogTableOpt.map(_.identifier)) + catalogTableOpt) } } catch { case NonFatal(e) if !Utils.isTesting => @@ -1086,12 +1086,12 @@ trait SnapshotManagement { self: DeltaLog => */ private def tryUpdate( isAsync: Boolean, - tableIdentifierOpt: Option[TableIdentifier]): Snapshot = { + catalogTableOpt: Option[CatalogTable]): Snapshot = { if (snapshotLock.tryLock()) { try { updateInternal( isAsync, - tableIdentifierOpt) + catalogTableOpt) } finally { snapshotLock.unlock() } @@ -1106,18 +1106,18 @@ trait SnapshotManagement { self: DeltaLog => */ protected def updateInternal( isAsync: Boolean, - tableIdentifierOpt: Option[TableIdentifier]): Snapshot = + catalogTableOpt: Option[CatalogTable]): Snapshot = recordDeltaOperation(this, "delta.log.update", Map(TAG_ASYNC -> isAsync.toString)) { val updateStartTimeMs = clock.getTimeMillis() val previousSnapshot = currentSnapshot.snapshot val segmentOpt = createLogSegment( previousSnapshot, - tableIdentifierOpt) + catalogTableOpt) val newSnapshot = getUpdatedSnapshot( oldSnapshotOpt = Some(previousSnapshot), initialSegmentForNewSnapshot = segmentOpt, initialTableCommitCoordinatorClient = previousSnapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, isAsync = isAsync) installSnapshot(newSnapshot, updateStartTimeMs) } @@ -1130,8 +1130,7 @@ trait SnapshotManagement { self: DeltaLog => * @param initialSegmentForNewSnapshot the log segment constructed for the new snapshot * @param initialTableCommitCoordinatorClient the commit-coordinator used for constructing the * `initialSegmentForNewSnapshot` - * @param tableIdentifierOpt the optional table identifier to pass to the commit coordinator - * client. + * @param catalogTableOpt the optional catalog table to pass to the commit coordinator client. * @param isAsync Whether the update is async. * @return The new snapshot. */ @@ -1139,13 +1138,13 @@ trait SnapshotManagement { self: DeltaLog => oldSnapshotOpt: Option[Snapshot], initialSegmentForNewSnapshot: Option[LogSegment], initialTableCommitCoordinatorClient: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], isAsync: Boolean): Snapshot = { var newSnapshot = getSnapshotForLogSegmentInternal( oldSnapshotOpt, initialSegmentForNewSnapshot, initialTableCommitCoordinatorClient, - tableIdentifierOpt, + catalogTableOpt, isAsync ) // Identify whether the snapshot was created using a "stale" commit-coordinator. If yes, we need @@ -1159,12 +1158,12 @@ trait SnapshotManagement { self: DeltaLog => if (usedStaleCommitCoordinator) { val segmentOpt = createLogSegment( newSnapshot, - tableIdentifierOpt) + catalogTableOpt) newSnapshot = getSnapshotForLogSegmentInternal( Some(newSnapshot), segmentOpt, newSnapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt, + catalogTableOpt, isAsync) } newSnapshot @@ -1175,7 +1174,7 @@ trait SnapshotManagement { self: DeltaLog => previousSnapshotOpt: Option[Snapshot], segmentOpt: Option[LogSegment], tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], isAsync: Boolean): Snapshot = { segmentOpt.map { segment => if (previousSnapshotOpt.exists(_.logSegment == segment)) { @@ -1186,7 +1185,7 @@ trait SnapshotManagement { self: DeltaLog => val newSnapshot = createSnapshot( initSegment = segment, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, checksumOpt = None) previousSnapshotOpt.foreach(logMetadataTableIdChange(_, newSnapshot)) logInfo(log"Updated snapshot to ${MDC(DeltaLogKeys.SNAPSHOT, newSnapshot)}") @@ -1246,7 +1245,7 @@ trait SnapshotManagement { self: DeltaLog => initSegment: LogSegment, newChecksumOpt: Option[VersionChecksum], tableCommitCoordinatorClientOpt: Option[TableCommitCoordinatorClient], - tableIdentifierOpt: Option[TableIdentifier], + catalogTableOpt: Option[CatalogTable], committedVersion: Long): Snapshot = { logInfo( log"Creating a new snapshot v${MDC(DeltaLogKeys.VERSION, initSegment.version)} " + @@ -1255,7 +1254,7 @@ trait SnapshotManagement { self: DeltaLog => val newSnapshot = createSnapshot( initSegment, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, checksumOpt = newChecksumOpt ) // Verify when enabled or when tests run to help future proof IC @@ -1276,7 +1275,7 @@ trait SnapshotManagement { self: DeltaLog => initSegment, newChecksumOpt = None, tableCommitCoordinatorClientOpt = tableCommitCoordinatorClientOpt, - tableIdentifierOpt, + catalogTableOpt, committedVersion) } } @@ -1292,14 +1291,14 @@ trait SnapshotManagement { self: DeltaLog => * @param newChecksumOpt the checksum for the new commit, if available. * Usually None, since the commit would have just finished. * @param preCommitLogSegment the log segment of the table prior to commit - * @param tableIdentifierOpt the identifier of the current table. + * @param catalogTableOpt the current catalog table */ def updateAfterCommit( committedVersion: Long, commit: Commit, newChecksumOpt: Option[VersionChecksum], preCommitLogSegment: LogSegment, - tableIdentifierOpt: Option[TableIdentifier]): Snapshot = withSnapshotLockInterruptibly { + catalogTableOpt: Option[CatalogTable]): Snapshot = withSnapshotLockInterruptibly { recordDeltaOperation(this, "delta.log.updateAfterCommit") { val updateTimestamp = clock.getTimeMillis() val previousSnapshot = currentSnapshot.snapshot @@ -1311,7 +1310,7 @@ trait SnapshotManagement { self: DeltaLog => preCommitLogSegment, commit, previousSnapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt, + catalogTableOpt, previousSnapshot.checkpointProvider) // This likely implies a list-after-write inconsistency @@ -1327,7 +1326,7 @@ trait SnapshotManagement { self: DeltaLog => segment, newChecksumOpt, previousSnapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt, + catalogTableOpt, committedVersion) logMetadataTableIdChange(previousSnapshot, newSnapshot) logInfo(log"Updated snapshot to ${MDC(DeltaLogKeys.SNAPSHOT, newSnapshot)}") @@ -1386,12 +1385,11 @@ trait SnapshotManagement { self: DeltaLog => .map(manuallyLoadCheckpoint) lastCheckpointInfoForListing -> None } - val tableIdentifierOpt = catalogTableOpt.map(_.identifier) val logSegmentOpt = createLogSegment( versionToLoad = Some(version), oldCheckpointProviderOpt = lastCheckpointProviderOpt, tableCommitCoordinatorClientOpt = upperBoundSnapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, lastCheckpointInfo = lastCheckpointInfoOpt) val logSegment = logSegmentOpt.getOrElse { // We can't return InitialSnapshot because our caller asked for a specific snapshot version. @@ -1400,7 +1398,7 @@ trait SnapshotManagement { self: DeltaLog => createSnapshot( initSegment = logSegment, tableCommitCoordinatorClientOpt = upperBoundSnapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt = tableIdentifierOpt, + catalogTableOpt = catalogTableOpt, checksumOpt = None) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala index a731ef95fe..c9a7c3bc7a 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsUtils.scala @@ -37,6 +37,7 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{TableIdentifier => CatalystTableIdentifier} +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.util.Utils @@ -49,7 +50,7 @@ object CoordinatedCommitsUtils extends DeltaLogging { def getCommitsFromCommitCoordinatorWithUsageLogs( deltaLog: DeltaLog, tableCommitCoordinatorClient: TableCommitCoordinatorClient, - tableIdentifierOpt: Option[CatalystTableIdentifier], + catalogTableOpt: Option[CatalogTable], startVersion: Long, versionToLoad: Option[Long], isAsyncRequest: Boolean): JGetCommitsResponse = { @@ -71,7 +72,10 @@ object CoordinatedCommitsUtils extends DeltaLogging { try { val response = tableCommitCoordinatorClient.getCommits( - tableIdentifierOpt, Some(startVersion), endVersion = versionToLoad) + catalogTableOpt.map(_.identifier), + Some(startVersion), + endVersion = versionToLoad + ) val additionalEventData = Map( "responseCommitsSize" -> response.getCommits.size, "responseLatestTableVersion" -> response.getLatestTableVersion) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala index 8682a23e0d..ebdcda11cc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/SnapshotManagementSuite.scala @@ -469,7 +469,7 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar val newLogSegment = log.snapshot.logSegment assert(log.getLogSegmentAfterCommit( log.snapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt = None, + catalogTableOpt = None, oldLogSegment.checkpointProvider) === newLogSegment) spark.range(10).write.format("delta").mode("append").save(path) val fs = log.logPath.getFileSystem(log.newDeltaHadoopConf()) @@ -491,7 +491,7 @@ class SnapshotManagementSuite extends QueryTest with DeltaSQLTestUtils with Shar } assert(log.getLogSegmentAfterCommit( log.snapshot.tableCommitCoordinatorClientOpt, - tableIdentifierOpt = None, + catalogTableOpt = None, oldLogSegment.checkpointProvider) === log.snapshot.logSegment) } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala index ac0206e432..b9e93c3483 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/test/DeltaTestImplicits.scala @@ -123,7 +123,7 @@ object DeltaTestImplicits { /** Helper class for working with [[Snapshot]] */ implicit class SnapshotTestHelper(snapshot: Snapshot) { def ensureCommitFilesBackfilled(): Unit = { - snapshot.ensureCommitFilesBackfilled(tableIdentifierOpt = None) + snapshot.ensureCommitFilesBackfilled(catalogTableOpt = None) } }