Skip to content

Commit 2dd4495

Browse files
Karuppayya Rajendrankaruppayya
Karuppayya Rajendran
authored and
karuppayya
committed
Distributed orphan file removal
1 parent 61e8ace commit 2dd4495

File tree

4 files changed

+143
-63
lines changed

4 files changed

+143
-63
lines changed

api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,8 @@ default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthoriti
142142
interface Result {
143143
/** Returns locations of orphan files. */
144144
Iterable<String> orphanFileLocations();
145+
146+
long orphanFileLocationsCount();
145147
}
146148

147149
/**
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.util;
20+
21+
import java.io.Serializable;
22+
import java.util.function.Consumer;
23+
24+
public interface SerializableConsumer<T> extends Consumer<T>, Serializable {}

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/DeleteOrphanFilesSparkAction.java

Lines changed: 94 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.io.UncheckedIOException;
2727
import java.net.URI;
2828
import java.sql.Timestamp;
29+
import java.util.ArrayList;
2930
import java.util.Collections;
3031
import java.util.Iterator;
3132
import java.util.List;
@@ -49,6 +50,7 @@
4950
import org.apache.iceberg.exceptions.ValidationException;
5051
import org.apache.iceberg.hadoop.HiddenPathFilter;
5152
import org.apache.iceberg.io.BulkDeletionFailureException;
53+
import org.apache.iceberg.io.FileIO;
5254
import org.apache.iceberg.io.SupportsBulkOperations;
5355
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
5456
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -60,9 +62,11 @@
6062
import org.apache.iceberg.spark.JobGroupInfo;
6163
import org.apache.iceberg.util.Pair;
6264
import org.apache.iceberg.util.PropertyUtil;
65+
import org.apache.iceberg.util.SerializableConsumer;
6366
import org.apache.iceberg.util.Tasks;
6467
import org.apache.spark.api.java.JavaRDD;
6568
import org.apache.spark.api.java.function.FlatMapFunction;
69+
import org.apache.spark.api.java.function.ForeachPartitionFunction;
6670
import org.apache.spark.api.java.function.MapPartitionsFunction;
6771
import org.apache.spark.broadcast.Broadcast;
6872
import org.apache.spark.sql.Column;
@@ -121,6 +125,9 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
121125
private Dataset<Row> compareToFileList;
122126
private Consumer<String> deleteFunc = null;
123127
private ExecutorService deleteExecutorService = null;
128+
private SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
129+
private Dataset<String> orphanFileDF = null;
130+
private boolean includeFilePathsInResults = true;
124131

125132
DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
126133
super(spark);
@@ -185,6 +192,10 @@ public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
185192
return this;
186193
}
187194

195+
public void includeFilePathsInResult(boolean include) {
196+
this.includeFilePathsInResults = include;
197+
}
198+
188199
public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
189200
StructType schema = files.schema();
190201

@@ -233,46 +244,86 @@ private String jobDesc() {
233244
return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name());
234245
}
235246

236-
private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
237-
try {
238-
io.deleteFiles(paths);
239-
LOG.info("Deleted {} files using bulk deletes", paths.size());
240-
} catch (BulkDeletionFailureException e) {
241-
int deletedFilesCount = paths.size() - e.numberFailedObjects();
242-
LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size());
243-
}
247+
private SerializableConsumer<Iterator<String>> bulkDeleteFiles(SupportsBulkOperations io) {
248+
return stringIterator -> {
249+
ArrayList<String> paths = Lists.newArrayList(stringIterator);
250+
try {
251+
io.deleteFiles(paths);
252+
} catch (BulkDeletionFailureException e) {
253+
int deletedFilesCount = paths.size() - e.numberFailedObjects();
254+
LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size());
255+
}
256+
};
244257
}
245258

246-
private DeleteOrphanFiles.Result doExecute() {
259+
private Result doExecute() {
247260
Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
248261
Dataset<FileURI> validFileIdentDS = validFileIdentDS();
262+
Dataset<String> orphanFiles = findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS).coalesce(10);
249263

250-
List<String> orphanFiles =
251-
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);
264+
// to materialize the dataframe to populate the accumulator
265+
orphanFiles.cache();
266+
orphanFiles.count();
267+
if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
268+
throw new ValidationException(
269+
"Unable to determine whether certain files are orphan. "
270+
+ "Metadata references files that match listed/provided files except for authority/scheme. "
271+
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
272+
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
273+
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
274+
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
275+
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
276+
+ "Conflicting authorities/schemes: %s.",
277+
conflicts.value());
278+
}
252279

253-
if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
254-
deleteFiles((SupportsBulkOperations) table.io(), orphanFiles);
280+
FileIO io = table.io();
281+
if (deleteFunc == null && io instanceof SupportsBulkOperations) {
282+
Consumer<Iterator<String>> iteratorConsumer = bulkDeleteFiles((SupportsBulkOperations) io);
283+
orphanFiles.foreachPartition((ForeachPartitionFunction<String>) iteratorConsumer::accept);
255284
} else {
256-
257-
Tasks.Builder<String> deleteTasks =
258-
Tasks.foreach(orphanFiles)
259-
.noRetry()
260-
.executeWith(deleteExecutorService)
261-
.suppressFailureWhenFinished()
262-
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
263-
264-
if (deleteFunc == null) {
265-
LOG.info(
266-
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
267-
table.io().getClass().getName());
268-
deleteTasks.run(table.io()::deleteFile);
285+
Consumer<String> deleteFunction = nonBulkDeleteFiles();
286+
if (deleteFunction instanceof SerializableConsumer && deleteExecutorService == null) {
287+
orphanFiles.foreachPartition(
288+
(ForeachPartitionFunction<String>)
289+
iterator -> {
290+
while (iterator.hasNext()) {
291+
String next = iterator.next();
292+
deleteFunction.accept(next);
293+
}
294+
});
269295
} else {
270-
LOG.info("Custom delete function provided. Using non-bulk deletes");
271-
deleteTasks.run(deleteFunc::accept);
296+
List<String> filesToDelete = orphanFiles.collectAsList();
297+
Tasks.Builder<String> deleteTasks =
298+
Tasks.foreach(filesToDelete)
299+
.noRetry()
300+
.executeWith(deleteExecutorService)
301+
.suppressFailureWhenFinished()
302+
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
303+
deleteTasks.run(deleteFunction::accept);
272304
}
273305
}
306+
return results(orphanFiles);
307+
}
308+
309+
private Consumer<String> nonBulkDeleteFiles() {
310+
FileIO io = table.io();
311+
if (deleteFunc == null) {
312+
LOG.info(
313+
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
314+
io.getClass().getName());
315+
return (SerializableConsumer<String>) io::deleteFile;
316+
} else {
317+
LOG.info("Custom delete function provided. Using non-bulk deletes");
318+
return deleteFunc;
319+
}
320+
}
274321

275-
return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build();
322+
Result results(Dataset<String> df) {
323+
return ImmutableDeleteOrphanFiles.Result.builder()
324+
.orphanFileLocations(includeFilePathsInResults ? df.collectAsList() : Lists.newArrayList())
325+
.orphanFileLocationsCount(df.count())
326+
.build();
276327
}
277328

278329
private Dataset<FileURI> validFileIdentDS() {
@@ -387,38 +438,27 @@ private static void listDirRecursively(
387438
}
388439
}
389440

390-
@VisibleForTesting
391-
static List<String> findOrphanFiles(
392-
SparkSession spark,
393-
Dataset<FileURI> actualFileIdentDS,
394-
Dataset<FileURI> validFileIdentDS,
395-
PrefixMismatchMode prefixMismatchMode) {
396-
397-
SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
441+
Dataset<String> findOrphanFiles(
442+
SparkSession spark, Dataset<FileURI> actualFileIdentDS, Dataset<FileURI> validFileIdentDS) {
443+
if (orphanFileDF != null) {
444+
return orphanFileDF;
445+
}
446+
SetAccumulator<Pair<String, String>> conflicts = accumulator();
398447
spark.sparkContext().register(conflicts);
399-
400448
Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));
401449

402-
List<String> orphanFiles =
450+
orphanFileDF =
403451
actualFileIdentDS
404452
.joinWith(validFileIdentDS, joinCond, "leftouter")
405-
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
406-
.collectAsList();
453+
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING());
454+
return orphanFileDF;
455+
}
407456

408-
if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
409-
throw new ValidationException(
410-
"Unable to determine whether certain files are orphan. "
411-
+ "Metadata references files that match listed/provided files except for authority/scheme. "
412-
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
413-
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
414-
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
415-
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
416-
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
417-
+ "Conflicting authorities/schemes: %s.",
418-
conflicts.value());
457+
private SetAccumulator<Pair<String, String>> accumulator() {
458+
if (conflicts == null) {
459+
conflicts = new SetAccumulator<>();
419460
}
420-
421-
return orphanFiles;
461+
return conflicts;
422462
}
423463

424464
private static Map<String, String> flattenMap(Map<String, String> map) {

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@
8282
import org.apache.spark.sql.RowFactory;
8383
import org.apache.spark.sql.types.DataTypes;
8484
import org.apache.spark.sql.types.StructType;
85+
import org.junit.jupiter.api.AfterEach;
8586
import org.junit.jupiter.api.BeforeEach;
8687
import org.junit.jupiter.api.TestTemplate;
8788
import org.junit.jupiter.api.extension.ExtendWith;
@@ -157,8 +158,7 @@ public void testDryRun() throws IOException {
157158

158159
SparkActions actions = SparkActions.get();
159160

160-
DeleteOrphanFiles.Result result1 =
161-
actions.deleteOrphanFiles(table).deleteWith(s -> {}).execute();
161+
DeleteOrphanFiles.Result result1 = actions.deleteOrphanFiles(table).execute();
162162
assertThat(result1.orphanFileLocations())
163163
.as("Default olderThan interval should be safe")
164164
.isEmpty();
@@ -381,7 +381,7 @@ public void testMetadataFolderIsIntact() {
381381
DeleteOrphanFiles.Result result =
382382
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
383383

384-
assertThat(result.orphanFileLocations()).as("Should delete 1 file").hasSize(1);
384+
assertThat(result.orphanFileLocationsCount()).isEqualTo(1);
385385

386386
Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
387387
List<ThreeColumnRecord> actualRecords =
@@ -536,7 +536,7 @@ public void testHiddenPartitionPaths() {
536536
DeleteOrphanFiles.Result result =
537537
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();
538538

539-
assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
539+
assertThat(result.orphanFileLocationsCount()).as("Should delete 2 files").isEqualTo(2);
540540
}
541541

542542
@TestTemplate
@@ -1002,6 +1002,7 @@ public void testPathsWithEqualSchemes() {
10021002
.hasMessageStartingWith("Unable to determine whether certain files are orphan")
10031003
.hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1, scheme2)].");
10041004

1005+
TABLES.dropTable(tableLocation);
10051006
Map<String, String> equalSchemes = Maps.newHashMap();
10061007
equalSchemes.put("scheme1", "scheme");
10071008
equalSchemes.put("scheme2", "scheme");
@@ -1031,6 +1032,7 @@ public void testPathsWithEqualAuthorities() {
10311032
.hasMessageStartingWith("Unable to determine whether certain files are orphan")
10321033
.hasMessageEndingWith("Conflicting authorities/schemes: [(servicename1, servicename2)].");
10331034

1035+
TABLES.dropTable(tableLocation);
10341036
Map<String, String> equalAuthorities = Maps.newHashMap();
10351037
equalAuthorities.put("servicename1", "servicename");
10361038
equalAuthorities.put("servicename2", "servicename");
@@ -1079,15 +1081,27 @@ private void executeTest(
10791081
Map<String, String> equalSchemes,
10801082
Map<String, String> equalAuthorities,
10811083
DeleteOrphanFiles.PrefixMismatchMode mode) {
1082-
10831084
StringToFileURI toFileUri = new StringToFileURI(equalSchemes, equalAuthorities);
10841085

10851086
Dataset<String> validFileDS = spark.createDataset(validFiles, Encoders.STRING());
10861087
Dataset<String> actualFileDS = spark.createDataset(actualFiles, Encoders.STRING());
1088+
Table table =
1089+
TABLES.create(
1090+
SCHEMA,
1091+
PartitionSpec.unpartitioned(),
1092+
properties,
1093+
tableLocation);
1094+
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction =
1095+
SparkActions.get().deleteOrphanFiles(table).prefixMismatchMode(mode);
1096+
deleteOrphanFilesSparkAction.findOrphanFiles(
1097+
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS));
1098+
1099+
DeleteOrphanFiles.Result result = deleteOrphanFilesSparkAction.execute();
1100+
assertThat(Lists.newArrayList(result.orphanFileLocations())).isEqualTo(expectedOrphanFiles);
1101+
}
10871102

1088-
List<String> orphanFiles =
1089-
DeleteOrphanFilesSparkAction.findOrphanFiles(
1090-
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
1091-
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
1103+
@AfterEach
1104+
public void after() throws IOException {
1105+
TABLES.dropTable(tableLocation);
10921106
}
10931107
}

0 commit comments

Comments
 (0)