From 796f518f889f0cecb1fcb15f02d6607af8122456 Mon Sep 17 00:00:00 2001 From: Thang Long Vu <107926660+longvu-db@users.noreply.github.com> Date: Tue, 5 Nov 2024 17:27:54 +0100 Subject: [PATCH] [Spark] Move some ICT test helper utils to InCommitTimestampTestUtils.scala (#3843) #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description For code hygiene, we move some ICT test helper utils from the `ICTSuite` to the object `InCommitTimestampTestUtils`, which is dedicated to contain the ICT test utils. ## How was this patch tested? Existing UTs. ## Does this PR introduce _any_ user-facing changes? No. --- .../sql/delta/InCommitTimestampSuite.scala | 19 +----------- .../delta/InCommitTimestampTestUtils.scala | 30 ++++++++++++++++++- 2 files changed, 30 insertions(+), 19 deletions(-) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala index 4e567eb221..c981672ffc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampSuite.scala @@ -46,30 +46,13 @@ class InCommitTimestampSuite with DeltaTestUtilsBase with CoordinatedCommitsTestUtils with StreamTest { + import InCommitTimestampTestUtils._ override def beforeAll(): Unit = { super.beforeAll() spark.conf.set(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey, "true") } - private def getInCommitTimestamp(deltaLog: DeltaLog, version: Long): Long = { - val deltaFile = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(version) - val commitInfo = DeltaHistoryManager.getCommitInfoOpt( - deltaLog.store, - deltaFile, - deltaLog.newDeltaHadoopConf()) - commitInfo.get.inCommitTimestamp.get - } - - private def getFileModificationTimesMap( - deltaLog: DeltaLog, start: Long, end: Long): Map[Long, Long] = { - deltaLog.store.listFrom( - FileNames.listingPrefix(deltaLog.logPath, start), deltaLog.newDeltaHadoopConf()) - .collect { case FileNames.DeltaFile(fs, v) => v -> fs.getModificationTime } - .takeWhile(_._1 <= end) - .toMap - } - test("Enable ICT on commit 0") { withTempDir { tempDir => spark.range(10).write.format("delta").save(tempDir.getAbsolutePath) diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampTestUtils.scala index b54d2be9cb..03fd2bb44f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/InCommitTimestampTestUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.delta import org.apache.spark.sql.delta.actions.{Action, CommitInfo} -import org.apache.spark.sql.delta.util.{FileNames, JsonUtils} +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, FileNames, JsonUtils} import org.apache.hadoop.fs.Path object InCommitTimestampTestUtils { @@ -55,4 +55,32 @@ object InCommitTimestampTestUtils { overwrite = true, deltaLog.newDeltaHadoopConf()) } + + /** + * Retrieves the in-commit timestamp for a specific version of the Delta Log. + */ + def getInCommitTimestamp(deltaLog: DeltaLog, version: Long): Long = { + val deltaFile = DeltaCommitFileProvider(deltaLog.unsafeVolatileSnapshot).deltaFile(version) + val commitInfo = DeltaHistoryManager.getCommitInfoOpt( + deltaLog.store, + deltaFile, + deltaLog.newDeltaHadoopConf()) + assert(commitInfo.isDefined, s"CommitInfo should exist for version $version") + assert(commitInfo.get.inCommitTimestamp.isDefined, + s"InCommitTimestamp should exist for CommitInfo's version $version") + commitInfo.get.inCommitTimestamp.get + } + + /** + * Retrieves a map of file modification times for Delta Log versions within a specified version + * range. + */ + def getFileModificationTimesMap( + deltaLog: DeltaLog, start: Long, end: Long): Map[Long, Long] = { + deltaLog.store.listFrom( + FileNames.listingPrefix(deltaLog.logPath, start), deltaLog.newDeltaHadoopConf()) + .collect { case FileNames.DeltaFile(fs, v) => v -> fs.getModificationTime } + .takeWhile(_._1 <= end) + .toMap + } }