diff --git a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 index 3cdf60158d..80e278ca0d 100644 --- a/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 +++ b/spark/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4 @@ -73,8 +73,7 @@ singleStatement // If you add keywords here that should not be reserved, add them to 'nonReserved' list. statement : VACUUM (path=STRING | table=qualifiedName) - (USING INVENTORY (inventoryTable=qualifiedName | LEFT_PAREN inventoryQuery=subQuery RIGHT_PAREN))? - (RETAIN number HOURS)? (DRY RUN)? #vacuumTable + vacuumModifiers #vacuumTable | (DESC | DESCRIBE) DETAIL (path=STRING | table=qualifiedName) #describeDeltaDetail | GENERATE modeName=identifier FOR TABLE table=qualifiedName #generate | (DESC | DESCRIBE) HISTORY (path=STRING | table=qualifiedName) @@ -196,6 +195,29 @@ dataType : identifier ('(' INTEGER_VALUE (',' INTEGER_VALUE)* ')')? #primitiveDataType ; +vacuumModifiers + : (vacuumType + | inventory + | retain + | dryRun)* + ; + +vacuumType + : LITE|FULL + ; + +inventory + : USING INVENTORY (inventoryTable=qualifiedName | LEFT_PAREN inventoryQuery=subQuery RIGHT_PAREN) + ; + +retain + : RETAIN number HOURS + ; + +dryRun + : DRY RUN + ; + number : MINUS? DECIMAL_VALUE #decimalLiteral | MINUS? INTEGER_VALUE #integerLiteral @@ -234,7 +256,7 @@ exprToken // Add keywords here so that people's queries don't break if they have a column name as one of // these tokens nonReserved - : VACUUM | USING | INVENTORY | RETAIN | HOURS | DRY | RUN + : VACUUM | FULL | LITE | USING | INVENTORY | RETAIN | HOURS | DRY | RUN | CONVERT | TO | DELTA | PARTITIONED | BY | DESC | DESCRIBE | LIMIT | DETAIL | GENERATE | FOR | TABLE | CHECK | EXISTS | OPTIMIZE | FULL @@ -285,6 +307,7 @@ IF: 'IF'; INVENTORY: 'INVENTORY'; LEFT_PAREN: '('; LIMIT: 'LIMIT'; +LITE: 'LITE'; LOCATION: 'LOCATION'; MINUS: '-'; NO: 'NO'; diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index c8da5f89f9..0cf1d80a26 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -319,17 +319,41 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { /** * Create a [[VacuumTableCommand]] logical plan. Example SQL: * {{{ - * VACUUM ('/path/to/dir' | delta.`/path/to/dir`) [RETAIN number HOURS] [DRY RUN]; + * VACUUM ('/path/to/dir' | delta.`/path/to/dir`) + * LITE|FULL + * [RETAIN number HOURS] [DRY RUN]; * }}} */ override def visitVacuumTable(ctx: VacuumTableContext): AnyRef = withOrigin(ctx) { + val vacuumModifiersCtx = ctx.vacuumModifiers() + withOrigin(vacuumModifiersCtx) { + checkDuplicateClauses(vacuumModifiersCtx.vacuumType(), "LITE/FULL", vacuumModifiersCtx) + checkDuplicateClauses(vacuumModifiersCtx.inventory(), "INVENTORY", vacuumModifiersCtx) + checkDuplicateClauses(vacuumModifiersCtx.retain(), "RETAIN", vacuumModifiersCtx) + checkDuplicateClauses(vacuumModifiersCtx.dryRun(), "DRY RUN", vacuumModifiersCtx) + if (!vacuumModifiersCtx.inventory().isEmpty && + !vacuumModifiersCtx.vacuumType().isEmpty && + vacuumModifiersCtx.vacuumType().asScala.head.LITE != null) { + operationNotAllowed("Inventory option is not compatible with LITE", vacuumModifiersCtx) + } + } VacuumTableCommand( path = Option(ctx.path).map(string), table = Option(ctx.table).map(visitTableIdentifier), - inventoryTable = Option(ctx.inventoryTable).map(visitTableIdentifier), - inventoryQuery = Option(ctx.inventoryQuery).map(extractRawText), - horizonHours = Option(ctx.number).map(_.getText.toDouble), - dryRun = ctx.RUN != null) + inventoryTable = ctx.vacuumModifiers().inventory().asScala.headOption.collect { + case i if i.inventoryTable != null => visitTableIdentifier(i.inventoryTable) + }, + inventoryQuery = ctx.vacuumModifiers().inventory().asScala.headOption.collect { + case i if i.inventoryQuery != null => extractRawText(i.inventoryQuery) + }, + horizonHours = + ctx.vacuumModifiers().retain().asScala.headOption.map(_.number.getText.toDouble), + dryRun = + ctx.vacuumModifiers().dryRun().asScala.headOption.map(_.RUN != null).getOrElse(false), + vacuumType = ctx.vacuumModifiers().vacuumType().asScala.headOption.map { + t => if (t.LITE != null) "LITE" else "FULL" + } + ) } /** Provides a list of unresolved attributes for multi dimensional clustering. */ diff --git a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala index 56c92fc079..2a98f64340 100644 --- a/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala +++ b/spark/src/main/scala/io/delta/tables/execution/VacuumTableCommand.scala @@ -43,7 +43,8 @@ case class VacuumTableCommand( horizonHours: Option[Double], inventoryTable: Option[LogicalPlan], inventoryQuery: Option[String], - dryRun: Boolean) extends RunnableCommand with UnaryNode with DeltaCommand { + dryRun: Boolean, + vacuumType: Option[String]) extends RunnableCommand with UnaryNode with DeltaCommand { override val output: Seq[Attribute] = Seq(AttributeReference("path", StringType, nullable = true)()) @@ -64,7 +65,7 @@ case class VacuumTableCommand( .map(p => Some(getDeltaTable(p, "VACUUM").toDf(sparkSession))) .getOrElse(inventoryQuery.map(sparkSession.sql)) VacuumCommand.gc(sparkSession, deltaTable.deltaLog, dryRun, horizonHours, - inventory).collect() + inventory, vacuumType).collect() } } @@ -75,9 +76,11 @@ object VacuumTableCommand { inventoryTable: Option[TableIdentifier], inventoryQuery: Option[String], horizonHours: Option[Double], - dryRun: Boolean): VacuumTableCommand = { + dryRun: Boolean, + vacuumType: Option[String]): VacuumTableCommand = { val child = UnresolvedDeltaPathOrIdentifier(path, table, "VACUUM") val unresolvedInventoryTable = inventoryTable.map(rt => UnresolvedTable(rt.nameParts, "VACUUM")) - VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun) + VacuumTableCommand(child, horizonHours, unresolvedInventoryTable, inventoryQuery, dryRun, + vacuumType) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala index 7dea663142..92896769f7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/VacuumCommand.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames} import org.apache.spark.sql.delta.util.DeltaFileOperations.tryDeleteNonRecursive import org.apache.spark.sql.delta.util.FileNames._ -import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.spark.sql.delta.util.{DeltaCommitFileProvider, JsonUtils} import com.fasterxml.jackson.databind.annotation.JsonDeserialize import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} @@ -74,6 +74,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { StructField("modificationTime", LongType) )) + object VacuumType extends Enumeration { + type VacuumType = Value + val LITE = Value("LITE") + val FULL = Value("FULL") + } + /** * Additional check on retention duration to prevent people from shooting themselves in the foot. */ @@ -211,14 +217,17 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { * @return A Dataset containing the paths of the files/folders to delete in dryRun mode. Otherwise * returns the base path of the table. */ + // scalastyle:off argcount def gc( spark: SparkSession, deltaLog: DeltaLog, dryRun: Boolean = true, retentionHours: Option[Double] = None, inventory: Option[DataFrame] = None, + vacuumTypeOpt: Option[String] = None, commandMetrics: Map[String, SQLMetric] = Map.empty, clock: Clock = new SystemClock): DataFrame = { + // scalastyle:on argcount recordDeltaOperation(deltaLog, "delta.gc") { val vacuumStartTime = System.currentTimeMillis() @@ -265,9 +274,12 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val partitionColumns = snapshot.metadata.partitionSchema.fieldNames val parallelism = spark.sessionState.conf.parallelPartitionDiscoveryParallelism val shouldIcebergMetadataDirBeHidden = UniversalFormat.icebergEnabled(snapshot.metadata) + // By default, we will do full vacuum unless LITE vacuum conf is set val isLiteVacuumEnabled = spark.sessionState.conf.getConf(DeltaSQLConf.LITE_VACUUM_ENABLED) + val defaultType = if (isLiteVacuumEnabled) VacuumType.LITE else VacuumType.FULL + val vacuumType = vacuumTypeOpt.map(VacuumType.withName).getOrElse(defaultType) val latestCommitVersionOutsideOfRetentionWindowOpt: Option[Long] = - if (isLiteVacuumEnabled) { + if (vacuumType == VacuumType.LITE) { try { val timestamp = new Timestamp(deleteBeforeTimestamp) val commit = new DeltaHistoryManager(deltaLog).getActiveCommitAtTime( @@ -290,7 +302,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { val files = getFilesFromInventory( basePath, partitionColumns, inventoryDF, shouldIcebergMetadataDirBeHidden) (files, None, None) - case _ if isLiteVacuumEnabled => + case _ if vacuumType == VacuumType.LITE => getFilesFromDeltaLog(spark, snapshot, basePath, hadoopConf, latestCommitVersionOutsideOfRetentionWindowOpt) case _ => @@ -408,7 +420,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { latestCommitVersion = snapshot.version, eligibleStartCommitVersion = eligibleStartCommitVersionOpt, eligibleEndCommitVersion = eligibleEndCommitVersionOpt, - typeOfVacuum = if (isLiteVacuumEnabled) "Lite" else "Full" + typeOfVacuum = vacuumType.toString ) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) @@ -455,7 +467,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { latestCommitVersion = snapshot.version, eligibleStartCommitVersion = eligibleStartCommitVersionOpt, eligibleEndCommitVersion = eligibleEndCommitVersionOpt, - typeOfVacuum = if (isLiteVacuumEnabled) "Lite" else "Full") + typeOfVacuum = vacuumType.toString) recordDeltaEvent(deltaLog, "delta.gc.stats", data = stats) logVacuumEnd( deltaLog, @@ -552,16 +564,36 @@ object VacuumCommand extends VacuumCommandImpl with Serializable { eligibleStartCommitVersion: Long, eligibleEndCommitVersion: Long): Dataset[SerializableFileStatus] = { import org.apache.spark.sql.delta.implicits._ + // When coordinated commits are enabled, commit files could be found in _delta_log directory + // as well as in commit directory. We get the delta log files outside of the retention window + // from both the places. val prefix = listingPrefix(deltaLog.logPath, eligibleStartCommitVersion) - val eligibleDeltaLogFilesOutsideTheRetentionWindow = + val eligibleDeltaLogFilesFromDeltaLogDirectory = deltaLog.store.listFrom(prefix, deltaLog.newDeltaHadoopConf) .collect { case DeltaFile(f, deltaFileVersion) => (f, deltaFileVersion) } .takeWhile(_._2 <= eligibleEndCommitVersion) .toSeq - val deltaLogFileIndex = DeltaLogFileIndex( - DeltaLogFileIndex.COMMIT_FILE_FORMAT, - eligibleDeltaLogFilesOutsideTheRetentionWindow.map(_._1)).get + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + val commitDirPath = FileNames.commitDirPath(deltaLog.logPath) + val updatedStartCommitVersion = + eligibleDeltaLogFilesFromDeltaLogDirectory.lastOption.map(_._2) + .getOrElse(eligibleStartCommitVersion) + val eligibleDeltaLogFilesFromCommitDirectory = if (fs.exists(commitDirPath)) { + deltaLog.store + .listFrom(listingPrefix(commitDirPath, updatedStartCommitVersion), + deltaLog.newDeltaHadoopConf) + .collect { case UnbackfilledDeltaFile(f, deltaFileVersion, _) => (f, deltaFileVersion) } + .takeWhile(_._2 <= eligibleEndCommitVersion) + .toSeq + } else { + Seq.empty + } + + val allDeltaLogFilesOutsideTheRetentionWindow = eligibleDeltaLogFilesFromDeltaLogDirectory ++ + eligibleDeltaLogFilesFromCommitDirectory + val deltaLogFileIndex = DeltaLogFileIndex(DeltaLogFileIndex.COMMIT_FILE_FORMAT, + allDeltaLogFilesOutsideTheRetentionWindow.map(_._1)).get val allActions = deltaLog.loadIndex(deltaLogFileIndex).as[SingleAction] val nonCDFFiles = allActions @@ -733,8 +765,11 @@ trait VacuumCommandImpl extends DeltaCommand { // This is done to make sure that the commit timestamp reflects the one provided by the clock // object. if (Utils.isTesting) { - val f = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) - f.setLastModified(deltaLog.clock.getTimeMillis()) + val fs = deltaLog.logPath.getFileSystem(deltaLog.newDeltaHadoopConf()) + val filePath = DeltaCommitFileProvider(deltaLog.update()).deltaFile(version) + if (fs.exists(filePath)) { + fs.setTimes(filePath, deltaLog.clock.getTimeMillis(), deltaLog.clock.getTimeMillis()) + } } } diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 1db2bbd911..f22e33e0d7 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -41,26 +41,29 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { // Setting `delegate` to `null` is fine. The following tests don't need to touch `delegate`. val parser = new DeltaSqlParser(null) assert(parser.parsePlan("vacuum 123_") === - VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM"), None, None, None, false)) + VacuumTableCommand(UnresolvedTable(Seq("123_"), "VACUUM"), None, None, None, false, None)) assert(parser.parsePlan("vacuum 1a.123_") === - VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM"), None, None, None, false)) + VacuumTableCommand(UnresolvedTable(Seq("1a", "123_"), "VACUUM"), None, None, None, false, + None)) assert(parser.parsePlan("vacuum a.123A") === - VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM"), None, None, None, false)) + VacuumTableCommand(UnresolvedTable(Seq("a", "123A"), "VACUUM"), None, None, None, false, + None)) assert(parser.parsePlan("vacuum a.123E3_column") === VacuumTableCommand(UnresolvedTable(Seq("a", "123E3_column"), "VACUUM"), - None, None, None, false)) + None, None, None, false, None)) assert(parser.parsePlan("vacuum a.123D_column") === VacuumTableCommand(UnresolvedTable(Seq("a", "123D_column"), "VACUUM"), - None, None, None, false)) + None, None, None, false, None)) assert(parser.parsePlan("vacuum a.123BD_column") === VacuumTableCommand(UnresolvedTable(Seq("a", "123BD_column"), "VACUUM"), - None, None, None, false)) + None, None, None, false, None)) assert(parser.parsePlan("vacuum delta.`/tmp/table`") === VacuumTableCommand(UnresolvedTable(Seq("delta", "/tmp/table"), "VACUUM"), - None, None, None, false)) + None, None, None, false, None)) assert(parser.parsePlan("vacuum \"/tmp/table\"") === VacuumTableCommand( - UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, None, None, false)) + UnresolvedPathBasedDeltaTable("/tmp/table", Map.empty, "VACUUM"), None, None, None, false, + None)) } test("Restore command is parsed as expected") { diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala index 71bf059f49..b2e1cd9c03 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaVacuumSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.spark.sql.delta.test.DeltaSQLTestUtils import org.apache.spark.sql.delta.test.DeltaTestImplicits._ -import org.apache.spark.sql.delta.util.{DeltaFileOperations, FileNames} +import org.apache.spark.sql.delta.util.{DeltaFileOperations, DeltaCommitFileProvider, FileNames} import org.apache.spark.sql.util.ScalaExtensions._ import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.FileSystem @@ -359,7 +359,7 @@ trait DeltaVacuumSuiteBase extends QueryTest } protected def deleteCommitFile(deltaLog: DeltaLog, version: Long) = { - new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri).delete() + new File(DeltaCommitFileProvider(deltaLog.update()).deltaFile(version).toUri).delete() } /** @@ -371,7 +371,7 @@ trait DeltaVacuumSuiteBase extends QueryTest } protected def setCommitClock(deltaLog: DeltaLog, version: Long, clock: ManualClock) = { - val f = new File(FileNames.unsafeDeltaFile(deltaLog.logPath, version).toUri) + val f = new File(DeltaCommitFileProvider(deltaLog.update()).deltaFile(version).toUri) f.setLastModified(clock.getTimeMillis()) } @@ -1315,8 +1315,8 @@ class DeltaVacuumSuite test(s"vacuum event logging dryRun=$isDryRun loggingEnabled=$loggingEnabled" + s" retentionHours=$retentionHours timeGap=$timeGapHours") { withSQLConf(DeltaSQLConf.DELTA_VACUUM_LOGGING_ENABLED.key -> loggingEnabled.toString) { - withEnvironment { (dir, clock) => + clock.setTime(System.currentTimeMillis()) spark.range(2).write.format("delta").save(dir.getAbsolutePath) val deltaLog = DeltaLog.forTable(spark, dir, clock) setCommitClock(deltaLog, 0L, clock) @@ -1395,6 +1395,83 @@ class DeltaVacuumSuite retentionHours = 20, // vacuum will not delete any files timeGapHours = 10 ) + + test(s"vacuum sql syntax checks") { + val tableName = "testTable" + withTable(tableName) { + withSQLConf( + DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false", + DeltaSQLConf.LITE_VACUUM_ENABLED.key -> "false" + ) { + spark.range(0, 50, step = 1, numPartitions = 5).write.format("delta").saveAsTable(tableName) + var e = intercept[AnalysisException] { + spark.sql(s"Vacuum $tableName DRY RUN DRY RUN") + } + assert(e.getMessage.contains("Found duplicate clauses: DRY RUN")) + + e = intercept[AnalysisException] { + spark.sql(s"Vacuum $tableName RETAIN 200 HOURS RETAIN 200 HOURS") + } + assert(e.getMessage.contains("Found duplicate clauses: RETAIN")) + + e = intercept[AnalysisException] { + spark.sql(s"Vacuum $tableName FULL LITE") + } + assert(e.getMessage.contains("Found duplicate clauses: LITE/FULL")) + + e = intercept[AnalysisException] { + spark.sql(s"Vacuum $tableName USING INVENTORY $tableName INVENTORY $tableName") + } + assert(e.getMessage.contains("Syntax error at or near 'INVENTORY'")) + + e = intercept[AnalysisException] { + spark.sql(s"Vacuum $tableName USING INVENTORY $tableName LITE") + } + assert(e.getMessage.contains("Inventory option is not compatible with LITE")) + + // create an uncommitted file. Presence or lack of this file will help us + // validate that we ran the right type of Vacuum. + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(tableName)) + val basePath = deltaLog.dataPath.toString + val clock = new ManualClock() + val fs = new Path(basePath).getFileSystem(deltaLog.newDeltaHadoopConf()) + val sanitizedPath = new Path("UnCommittedFile.parquet").toUri.toString + val file = new File( + fs.makeQualified(DeltaFileOperations.absolutePath(basePath, sanitizedPath)).toUri) + createFile(basePath, sanitizedPath, file, clock) + + spark.sql(s"DELETE from $tableName WHERE ID % 2 = 0 and ID < 40") + assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 4, dvFiles = 1, dataFiles = 6) + purgeDVs(tableName) + + assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 1, + dataFiles = 10) // 9 file actions + one uncommitted file + + spark.sql(s"Vacuum $tableName LITE DRY RUN RETAIN 0 HOURS") + // DRY RUN option doesn't change anything. + assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 1, + dataFiles = 10) + + // LITE will be able to GC 4 files removed by DELETE. + spark.sql(s"Vacuum $tableName LITE RETAIN 0 HOURS") + assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0, + dataFiles = 6) + + // Default is full and it's able to delete the 'notCommittedFile.parquet' + spark.sql(s"Vacuum $tableName RETAIN 0 HOURS") + assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0, + dataFiles = 5) + // Create the uncommittedFile file again to make sure explicit vacuum full works as + // expected. + createFile(basePath, sanitizedPath, file, clock) + assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0, + dataFiles = 6) + spark.sql(s"Vacuum $tableName FULL RETAIN 0 HOURS") + assertNumFiles(deltaLog, addFiles = 5, addFilesWithDVs = 0, dvFiles = 0, + dataFiles = 5) + } + } + } } class DeltaVacuumWithCoordinatedCommitsBatch100Suite extends DeltaVacuumSuite { @@ -1436,12 +1513,15 @@ class DeltaLiteVacuumSuite val deltaTable = io.delta.tables.DeltaTable.forPath(dir.getAbsolutePath) val deltaLog = DeltaLog.forTable(spark, dir.getAbsolutePath) deltaTable.delete() - // Checkpoint will allow us to construct the table snapshot - deltaLog.createCheckpointAtVersion(0L) + // Checkpoints will allow us to construct the table snapshot + deltaLog.createCheckpointAtVersion(2L) deleteCommitFile(deltaLog, 0L) // delete version 0 - intercept[DeltaIllegalStateException] { + + val e = intercept[DeltaIllegalStateException] { VacuumCommand.gc(spark, deltaLog, dryRun = true, retentionHours = Some(0)) } + assert(e.getMessage.contains("VACUUM LITE cannot delete all eligible files as some files" + + " are not referenced by the Delta log. Please run VACUUM FULL.")) } } } @@ -1478,9 +1558,12 @@ class DeltaLiteVacuumSuite for (i <- 1 to 2) { deleteCommitFile(deltaLog, i) } - intercept[DeltaIllegalStateException] { + + val e = intercept[DeltaIllegalStateException] { VacuumCommand.gc(spark, deltaLog, dryRun = true, retentionHours = Some(0)) } + assert(e.getMessage.contains("VACUUM LITE cannot delete all eligible files as some files" + + " are not referenced by the Delta log. Please run VACUUM FULL.")) } } } @@ -1512,3 +1595,7 @@ class DeltaLiteVacuumSuite } } } + +class DeltaLiteVacuumWithCoordinatedCommitsBatch100Suite extends DeltaLiteVacuumSuite { + override val coordinatedCommitsBackfillBatchSize: Option[Int] = Some(100) +}