Skip to content

Commit b560a2b

Browse files
Karuppayya Rajendrankaruppayya
Karuppayya Rajendran
authored andcommitted
Distributed orphan file removal
1 parent 61e8ace commit b560a2b

File tree

5 files changed

+144
-64
lines changed

5 files changed

+144
-64
lines changed

.palantir/revapi.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,6 +1177,10 @@ acceptedBreaks:
11771177
old: "class org.apache.iceberg.Metrics"
11781178
new: "class org.apache.iceberg.Metrics"
11791179
justification: "Java serialization across versions is not guaranteed"
1180+
- code: "java.method.addedToInterface"
1181+
new: "method long org.apache.iceberg.actions.DeleteOrphanFiles.Result::orphanFileLocationsCount()"
1182+
justification: "Introducing orphan file count instead orphaned files to reduce\
1183+
\ memory overhead."
11801184
org.apache.iceberg:iceberg-core:
11811185
- code: "java.method.removed"
11821186
old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\

api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthoriti
142142
interface Result {
143143
/** Returns locations of orphan files. */
144144
Iterable<String> orphanFileLocations();
145+
146+
long orphanFileLocationsCount();
145147
}
146148

147149
/**
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.util;
20+
21+
import java.io.Serializable;
22+
import java.util.function.Consumer;
23+
24+
public interface SerializableConsumer<T> extends Consumer<T>, Serializable {}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java

Lines changed: 96 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.iceberg.exceptions.ValidationException;
5050
import org.apache.iceberg.hadoop.HiddenPathFilter;
5151
import org.apache.iceberg.io.BulkDeletionFailureException;
52+
import org.apache.iceberg.io.FileIO;
5253
import org.apache.iceberg.io.SupportsBulkOperations;
5354
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
5455
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -60,9 +61,11 @@
6061
import org.apache.iceberg.spark.JobGroupInfo;
6162
import org.apache.iceberg.util.Pair;
6263
import org.apache.iceberg.util.PropertyUtil;
64+
import org.apache.iceberg.util.SerializableConsumer;
6365
import org.apache.iceberg.util.Tasks;
6466
import org.apache.spark.api.java.JavaRDD;
6567
import org.apache.spark.api.java.function.FlatMapFunction;
68+
import org.apache.spark.api.java.function.ForeachPartitionFunction;
6669
import org.apache.spark.api.java.function.MapPartitionsFunction;
6770
import org.apache.spark.broadcast.Broadcast;
6871
import org.apache.spark.sql.Column;
@@ -121,6 +124,9 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
121124
private Dataset<Row> compareToFileList;
122125
private Consumer<String> deleteFunc = null;
123126
private ExecutorService deleteExecutorService = null;
127+
private SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
128+
private Dataset<String> orphanFileDF = null;
129+
private boolean includeFilePathsInResults = true;
124130

125131
DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
126132
super(spark);
@@ -185,6 +191,10 @@ public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
185191
return this;
186192
}
187193

194+
public void includeFilePathsInResult(boolean include) {
195+
this.includeFilePathsInResults = include;
196+
}
197+
188198
public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
189199
StructType schema = files.schema();
190200

@@ -233,46 +243,87 @@ private String jobDesc() {
233243
return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name());
234244
}
235245

236-
private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
237-
try {
238-
io.deleteFiles(paths);
239-
LOG.info("Deleted {} files using bulk deletes", paths.size());
240-
} catch (BulkDeletionFailureException e) {
241-
int deletedFilesCount = paths.size() - e.numberFailedObjects();
242-
LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size());
243-
}
246+
private SerializableConsumer<Iterator<String>> bulkDeleteFiles(SupportsBulkOperations io) {
247+
return stringIterator -> {
248+
List<String> paths = Lists.newArrayList(stringIterator);
249+
try {
250+
io.deleteFiles(paths);
251+
} catch (BulkDeletionFailureException e) {
252+
int deletedFilesCount = paths.size() - e.numberFailedObjects();
253+
LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size());
254+
}
255+
};
244256
}
245257

246-
private DeleteOrphanFiles.Result doExecute() {
258+
private Result doExecute() {
247259
Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
248260
Dataset<FileURI> validFileIdentDS = validFileIdentDS();
261+
Dataset<String> orphanFiles =
262+
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS).coalesce(10);
249263

250-
List<String> orphanFiles =
251-
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);
264+
// to materialize the dataframe to populate the accumulator
265+
orphanFiles.cache();
266+
orphanFiles.count();
267+
if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
268+
throw new ValidationException(
269+
"Unable to determine whether certain files are orphan. "
270+
+ "Metadata references files that match listed/provided files except for authority/scheme. "
271+
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
272+
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
273+
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
274+
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
275+
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
276+
+ "Conflicting authorities/schemes: %s.",
277+
conflicts.value());
278+
}
252279

253-
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
254-
deleteFiles((SupportsBulkOperations) table.io(), orphanFiles);
280+
FileIO io = table.io();
281+
if (deleteFunc == null && io instanceof SupportsBulkOperations) {
282+
Consumer<Iterator<String>> iteratorConsumer = bulkDeleteFiles((SupportsBulkOperations) io);
283+
orphanFiles.foreachPartition((ForeachPartitionFunction<String>) iteratorConsumer::accept);
255284
} else {
256-
257-
Tasks.Builder<String> deleteTasks =
258-
Tasks.foreach(orphanFiles)
259-
.noRetry()
260-
.executeWith(deleteExecutorService)
261-
.suppressFailureWhenFinished()
262-
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
263-
264-
if (deleteFunc == null) {
265-
LOG.info(
266-
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
267-
table.io().getClass().getName());
268-
deleteTasks.run(table.io()::deleteFile);
285+
Consumer<String> deleteFunction = nonBulkDeleteFiles();
286+
if (deleteFunction instanceof SerializableConsumer && deleteExecutorService == null) {
287+
orphanFiles.foreachPartition(
288+
(ForeachPartitionFunction<String>)
289+
iterator -> {
290+
while (iterator.hasNext()) {
291+
String next = iterator.next();
292+
deleteFunction.accept(next);
293+
}
294+
});
269295
} else {
270-
LOG.info("Custom delete function provided. Using non-bulk deletes");
271-
deleteTasks.run(deleteFunc::accept);
296+
List<String> filesToDelete = orphanFiles.collectAsList();
297+
Tasks.Builder<String> deleteTasks =
298+
Tasks.foreach(filesToDelete)
299+
.noRetry()
300+
.executeWith(deleteExecutorService)
301+
.suppressFailureWhenFinished()
302+
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
303+
deleteTasks.run(deleteFunction::accept);
272304
}
273305
}
306+
return results(orphanFiles);
307+
}
308+
309+
private Consumer<String> nonBulkDeleteFiles() {
310+
FileIO io = table.io();
311+
if (deleteFunc == null) {
312+
LOG.info(
313+
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
314+
io.getClass().getName());
315+
return (SerializableConsumer<String>) io::deleteFile;
316+
} else {
317+
LOG.info("Custom delete function provided. Using non-bulk deletes");
318+
return deleteFunc;
319+
}
320+
}
274321

275-
return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build();
322+
Result results(Dataset<String> df) {
323+
return ImmutableDeleteOrphanFiles.Result.builder()
324+
.orphanFileLocations(includeFilePathsInResults ? df.collectAsList() : Lists.newArrayList())
325+
.orphanFileLocationsCount(df.count())
326+
.build();
276327
}
277328

278329
private Dataset<FileURI> validFileIdentDS() {
@@ -387,38 +438,28 @@ private static void listDirRecursively(
387438
}
388439
}
389440

390-
@VisibleForTesting
391-
static List<String> findOrphanFiles(
392-
SparkSession spark,
393-
Dataset<FileURI> actualFileIdentDS,
394-
Dataset<FileURI> validFileIdentDS,
395-
PrefixMismatchMode prefixMismatchMode) {
396-
397-
SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
398-
spark.sparkContext().register(conflicts);
399-
441+
Dataset<String> findOrphanFiles(
442+
SparkSession spark, Dataset<FileURI> actualFileIdentDS, Dataset<FileURI> validFileIdentDS) {
443+
if (orphanFileDF != null) {
444+
return orphanFileDF;
445+
}
446+
SetAccumulator<Pair<String, String>> conflictsAccumulator = accumulator();
447+
spark.sparkContext().register(conflictsAccumulator);
400448
Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));
401449

402-
List<String> orphanFiles =
450+
orphanFileDF =
403451
actualFileIdentDS
404452
.joinWith(validFileIdentDS, joinCond, "leftouter")
405-
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
406-
.collectAsList();
453+
.mapPartitions(
454+
new FindOrphanFiles(prefixMismatchMode, conflictsAccumulator), Encoders.STRING());
455+
return orphanFileDF;
456+
}
407457

408-
if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
409-
throw new ValidationException(
410-
"Unable to determine whether certain files are orphan. "
411-
+ "Metadata references files that match listed/provided files except for authority/scheme. "
412-
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
413-
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
414-
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
415-
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
416-
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
417-
+ "Conflicting authorities/schemes: %s.",
418-
conflicts.value());
458+
private SetAccumulator<Pair<String, String>> accumulator() {
459+
if (conflicts == null) {
460+
conflicts = new SetAccumulator<>();
419461
}
420-
421-
return orphanFiles;
462+
return conflicts;
422463
}
423464

424465
private static Map<String, String> flattenMap(Map<String, String> map) {

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.spark.sql.RowFactory;
8383
import org.apache.spark.sql.types.DataTypes;
8484
import org.apache.spark.sql.types.StructType;
85+
import org.junit.jupiter.api.AfterEach;
8586
import org.junit.jupiter.api.BeforeEach;
8687
import org.junit.jupiter.api.TestTemplate;
8788
import org.junit.jupiter.api.extension.ExtendWith;
@@ -157,8 +158,7 @@ public void testDryRun() throws IOException {
157158

158159
SparkActions actions = SparkActions.get();
159160

160-
DeleteOrphanFiles.Result result1 =
161-
actions.deleteOrphanFiles(table).deleteWith(s -> {}).execute();
161+
DeleteOrphanFiles.Result result1 = actions.deleteOrphanFiles(table).execute();
162162
assertThat(result1.orphanFileLocations())
163163
.as("Default olderThan interval should be safe")
164164
.isEmpty();
@@ -381,7 +381,7 @@ public void testMetadataFolderIsIntact() {
381381
DeleteOrphanFiles.Result result =
382382
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
383383

384-
assertThat(result.orphanFileLocations()).as("Should delete 1 file").hasSize(1);
384+
assertThat(result.orphanFileLocationsCount()).isEqualTo(1);
385385

386386
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
387387
List<ThreeColumnRecord> actualRecords =
@@ -536,7 +536,7 @@ public void testHiddenPartitionPaths() {
536536
DeleteOrphanFiles.Result result =
537537
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
538538

539-
assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
539+
assertThat(result.orphanFileLocationsCount()).as("Should delete 2 files").isEqualTo(2);
540540
}
541541

542542
@TestTemplate
@@ -1002,6 +1002,7 @@ public void testPathsWithEqualSchemes() {
10021002
.hasMessageStartingWith("Unable to determine whether certain files are orphan")
10031003
.hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1, scheme2)].");
10041004

1005+
TABLES.dropTable(tableLocation);
10051006
Map<String, String> equalSchemes = Maps.newHashMap();
10061007
equalSchemes.put("scheme1", "scheme");
10071008
equalSchemes.put("scheme2", "scheme");
@@ -1031,6 +1032,7 @@ public void testPathsWithEqualAuthorities() {
10311032
.hasMessageStartingWith("Unable to determine whether certain files are orphan")
10321033
.hasMessageEndingWith("Conflicting authorities/schemes: [(servicename1, servicename2)].");
10331034

1035+
TABLES.dropTable(tableLocation);
10341036
Map<String, String> equalAuthorities = Maps.newHashMap();
10351037
equalAuthorities.put("servicename1", "servicename");
10361038
equalAuthorities.put("servicename2", "servicename");
@@ -1079,15 +1081,22 @@ private void executeTest(
10791081
Map<String, String> equalSchemes,
10801082
Map<String, String> equalAuthorities,
10811083
DeleteOrphanFiles.PrefixMismatchMode mode) {
1082-
10831084
StringToFileURI toFileUri = new StringToFileURI(equalSchemes, equalAuthorities);
10841085

10851086
Dataset<String> validFileDS = spark.createDataset(validFiles, Encoders.STRING());
10861087
Dataset<String> actualFileDS = spark.createDataset(actualFiles, Encoders.STRING());
1088+
Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation);
1089+
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction =
1090+
SparkActions.get().deleteOrphanFiles(table).prefixMismatchMode(mode);
1091+
deleteOrphanFilesSparkAction.findOrphanFiles(
1092+
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS));
1093+
1094+
DeleteOrphanFiles.Result result = deleteOrphanFilesSparkAction.execute();
1095+
assertThat(Lists.newArrayList(result.orphanFileLocations())).isEqualTo(expectedOrphanFiles);
1096+
}
10871097

1088-
List<String> orphanFiles =
1089-
DeleteOrphanFilesSparkAction.findOrphanFiles(
1090-
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
1091-
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
1098+
@AfterEach
1099+
public void after() throws IOException {
1100+
TABLES.dropTable(tableLocation);
10921101
}
10931102
}

0 commit comments

Comments
 (0)