diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 1b9ef4604c91..905ef171a27f 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1177,6 +1177,10 @@ acceptedBreaks: old: "class org.apache.iceberg.Metrics" new: "class org.apache.iceberg.Metrics" justification: "Java serialization across versions is not guaranteed" + - code: "java.method.addedToInterface" + new: "method long org.apache.iceberg.actions.DeleteOrphanFiles.Result::orphanFileLocationsCount()" + justification: "Introducing orphan file count instead orphaned files to reduce\ + \ memory overhead." org.apache.iceberg:iceberg-core: - code: "java.method.removed" old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\ diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index 4e8f80fa833f..a1b76823c098 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -142,6 +142,8 @@ default DeleteOrphanFiles equalAuthorities(Map newEqualAuthoriti interface Result { /** Returns locations of orphan files. */ Iterable orphanFileLocations(); + + long orphanFileLocationsCount(); } /** diff --git a/core/src/main/java/org/apache/iceberg/util/SerializableConsumer.java b/core/src/main/java/org/apache/iceberg/util/SerializableConsumer.java new file mode 100644 index 000000000000..a0e71cf8ebc1 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/util/SerializableConsumer.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.util; + +import java.io.Serializable; +import java.util.function.Consumer; + +public interface SerializableConsumer extends Consumer, Serializable {} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 5fbb4117feb8..e9b82b87b996 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -272,7 +272,10 @@ private DeleteOrphanFiles.Result doExecute() { } } - return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); + return ImmutableDeleteOrphanFiles.Result.builder() + .orphanFileLocations(orphanFiles) + .orphanFileLocationsCount(0) + .build(); } private Dataset validFileIdentDS() { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 5fbb4117feb8..a69c21eab647 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -49,6 +49,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; import org.apache.iceberg.io.BulkDeletionFailureException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.SupportsBulkOperations; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -60,9 +61,11 @@ import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SerializableConsumer; import org.apache.iceberg.util.Tasks; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.ForeachPartitionFunction; import org.apache.spark.api.java.function.MapPartitionsFunction; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.Column; @@ -121,6 +124,9 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction compareToFileList; private Consumer deleteFunc = null; private ExecutorService deleteExecutorService = null; + private SetAccumulator> conflicts; + private Dataset orphanFileDF = null; + private boolean includeFilePathsInResults = true; DeleteOrphanFilesSparkAction(SparkSession spark, Table table) { super(spark); @@ -185,6 +191,10 @@ public DeleteOrphanFilesSparkAction deleteWith(Consumer newDeleteFunc) { return this; } + public void includeFilePathsInResult(boolean include) { + this.includeFilePathsInResults = include; + } + public DeleteOrphanFilesSparkAction compareToFileList(Dataset files) { StructType schema = files.schema(); @@ -233,46 +243,83 @@ private String jobDesc() { return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name()); } - private void deleteFiles(SupportsBulkOperations io, List paths) { - try { - io.deleteFiles(paths); - LOG.info("Deleted {} files using bulk deletes", paths.size()); - } catch (BulkDeletionFailureException e) { - int deletedFilesCount = paths.size() - e.numberFailedObjects(); - LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size()); - } + private SerializableConsumer> bulkDeleteFiles(SupportsBulkOperations io) { + return stringIterator -> { + List paths = Lists.newArrayList(stringIterator); + try { + io.deleteFiles(paths); + } catch (BulkDeletionFailureException e) { + int deletedFilesCount = paths.size() - e.numberFailedObjects(); + LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size()); + } + }; } - private DeleteOrphanFiles.Result doExecute() { + private Result doExecute() { Dataset actualFileIdentDS = actualFileIdentDS(); Dataset validFileIdentDS = validFileIdentDS(); + Dataset orphanFiles = + findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS).coalesce(10); + + orphanFiles.cache(); + // materialize the dataframe to populate the accumulator + orphanFiles.count(); + SetAccumulator> conflictsAccumulator = conflictsAccumulator(); + if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflictsAccumulator.value().isEmpty()) { + throw new ValidationException( + "Unable to determine whether certain files are orphan. " + + "Metadata references files that match listed/provided files except for authority/scheme. " + + "Please, inspect the conflicting authorities/schemes and provide which of them are equal " + + "by further configuring the action via equalSchemes() and equalAuthorities() methods. " + + "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting " + + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting " + + "authorities/schemes are different. It will be impossible to recover deleted files. " + + "Conflicting authorities/schemes: %s.", + conflictsAccumulator.value()); + } - List orphanFiles = - findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode); - - if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) { - deleteFiles((SupportsBulkOperations) table.io(), orphanFiles); + FileIO io = table.io(); + if (deleteFunc == null && io instanceof SupportsBulkOperations) { + Consumer> iteratorConsumer = bulkDeleteFiles((SupportsBulkOperations) io); + orphanFiles.foreachPartition((ForeachPartitionFunction) iteratorConsumer::accept); } else { - - Tasks.Builder deleteTasks = - Tasks.foreach(orphanFiles) - .noRetry() - .executeWith(deleteExecutorService) - .suppressFailureWhenFinished() - .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); - - if (deleteFunc == null) { - LOG.info( - "Table IO {} does not support bulk operations. Using non-bulk deletes.", - table.io().getClass().getName()); - deleteTasks.run(table.io()::deleteFile); + Consumer deleteFunction = nonBulkDeleteFiles(); + if (deleteFunction instanceof SerializableConsumer && deleteExecutorService == null) { + orphanFiles.foreachPartition( + (ForeachPartitionFunction) + iterator -> iterator.forEachRemaining(deleteFunction)); } else { - LOG.info("Custom delete function provided. Using non-bulk deletes"); - deleteTasks.run(deleteFunc::accept); + List filesToDelete = orphanFiles.collectAsList(); + Tasks.Builder deleteTasks = + Tasks.foreach(filesToDelete) + .noRetry() + .executeWith(deleteExecutorService) + .suppressFailureWhenFinished() + .onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc)); + deleteTasks.run(deleteFunction::accept); } } + return results(orphanFiles); + } - return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); + private Consumer nonBulkDeleteFiles() { + FileIO io = table.io(); + if (deleteFunc == null) { + LOG.info( + "Table IO {} does not support bulk operations. Using non-bulk deletes.", + io.getClass().getName()); + return (SerializableConsumer) io::deleteFile; + } else { + LOG.info("Custom delete function provided. Using non-bulk deletes"); + return deleteFunc; + } + } + + Result results(Dataset df) { + return ImmutableDeleteOrphanFiles.Result.builder() + .orphanFileLocations(includeFilePathsInResults ? df.collectAsList() : Lists.newArrayList()) + .orphanFileLocationsCount(df.count()) + .build(); } private Dataset validFileIdentDS() { @@ -387,38 +434,28 @@ private static void listDirRecursively( } } - @VisibleForTesting - static List findOrphanFiles( - SparkSession spark, - Dataset actualFileIdentDS, - Dataset validFileIdentDS, - PrefixMismatchMode prefixMismatchMode) { - - SetAccumulator> conflicts = new SetAccumulator<>(); - spark.sparkContext().register(conflicts); - + Dataset findOrphanFiles( + SparkSession spark, Dataset actualFileIdentDS, Dataset validFileIdentDS) { + if (orphanFileDF != null) { + return orphanFileDF; + } + SetAccumulator> conflictsAccumulator = conflictsAccumulator(); + spark.sparkContext().register(conflictsAccumulator); Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path")); - List orphanFiles = + orphanFileDF = actualFileIdentDS .joinWith(validFileIdentDS, joinCond, "leftouter") - .mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING()) - .collectAsList(); + .mapPartitions( + new FindOrphanFiles(prefixMismatchMode, conflictsAccumulator), Encoders.STRING()); + return orphanFileDF; + } - if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) { - throw new ValidationException( - "Unable to determine whether certain files are orphan. " - + "Metadata references files that match listed/provided files except for authority/scheme. " - + "Please, inspect the conflicting authorities/schemes and provide which of them are equal " - + "by further configuring the action via equalSchemes() and equalAuthorities() methods. " - + "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting " - + "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting " - + "authorities/schemes are different. It will be impossible to recover deleted files. " - + "Conflicting authorities/schemes: %s.", - conflicts.value()); + private SetAccumulator> conflictsAccumulator() { + if (conflicts == null) { + conflicts = new SetAccumulator<>(); } - - return orphanFiles; + return conflicts; } private static Map flattenMap(Map map) { diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index d3508283dd8c..9da0849c8fd6 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -82,6 +82,7 @@ import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -157,8 +158,7 @@ public void testDryRun() throws IOException { SparkActions actions = SparkActions.get(); - DeleteOrphanFiles.Result result1 = - actions.deleteOrphanFiles(table).deleteWith(s -> {}).execute(); + DeleteOrphanFiles.Result result1 = actions.deleteOrphanFiles(table).execute(); assertThat(result1.orphanFileLocations()) .as("Default olderThan interval should be safe") .isEmpty(); @@ -381,7 +381,7 @@ public void testMetadataFolderIsIntact() { DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); - assertThat(result.orphanFileLocations()).as("Should delete 1 file").hasSize(1); + assertThat(result.orphanFileLocationsCount()).isEqualTo(1); Dataset resultDF = spark.read().format("iceberg").load(tableLocation); List actualRecords = @@ -536,7 +536,7 @@ public void testHiddenPartitionPaths() { DeleteOrphanFiles.Result result = actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute(); - assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2); + assertThat(result.orphanFileLocationsCount()).as("Should delete 2 files").isEqualTo(2); } @TestTemplate @@ -1002,6 +1002,7 @@ public void testPathsWithEqualSchemes() { .hasMessageStartingWith("Unable to determine whether certain files are orphan") .hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1, scheme2)]."); + TABLES.dropTable(tableLocation); Map equalSchemes = Maps.newHashMap(); equalSchemes.put("scheme1", "scheme"); equalSchemes.put("scheme2", "scheme"); @@ -1031,6 +1032,7 @@ public void testPathsWithEqualAuthorities() { .hasMessageStartingWith("Unable to determine whether certain files are orphan") .hasMessageEndingWith("Conflicting authorities/schemes: [(servicename1, servicename2)]."); + TABLES.dropTable(tableLocation); Map equalAuthorities = Maps.newHashMap(); equalAuthorities.put("servicename1", "servicename"); equalAuthorities.put("servicename2", "servicename"); @@ -1079,15 +1081,22 @@ private void executeTest( Map equalSchemes, Map equalAuthorities, DeleteOrphanFiles.PrefixMismatchMode mode) { - StringToFileURI toFileUri = new StringToFileURI(equalSchemes, equalAuthorities); Dataset validFileDS = spark.createDataset(validFiles, Encoders.STRING()); Dataset actualFileDS = spark.createDataset(actualFiles, Encoders.STRING()); + Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation); + DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction = + SparkActions.get().deleteOrphanFiles(table).prefixMismatchMode(mode); + deleteOrphanFilesSparkAction.findOrphanFiles( + spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS)); + + DeleteOrphanFiles.Result result = deleteOrphanFilesSparkAction.execute(); + assertThat(Lists.newArrayList(result.orphanFileLocations())).isEqualTo(expectedOrphanFiles); + } - List orphanFiles = - DeleteOrphanFilesSparkAction.findOrphanFiles( - spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode); - assertThat(orphanFiles).isEqualTo(expectedOrphanFiles); + @AfterEach + public void after() throws IOException { + TABLES.dropTable(tableLocation); } } diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java index 5fbb4117feb8..e9b82b87b996 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java @@ -272,7 +272,10 @@ private DeleteOrphanFiles.Result doExecute() { } } - return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build(); + return ImmutableDeleteOrphanFiles.Result.builder() + .orphanFileLocations(orphanFiles) + .orphanFileLocationsCount(0) + .build(); } private Dataset validFileIdentDS() {