Skip to content

Prevent driver from overwhelming during orphan file removal #13084

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1177,6 +1177,10 @@ acceptedBreaks:
old: "class org.apache.iceberg.Metrics"
new: "class org.apache.iceberg.Metrics"
justification: "Java serialization across versions is not guaranteed"
- code: "java.method.addedToInterface"
new: "method long org.apache.iceberg.actions.DeleteOrphanFiles.Result::orphanFileLocationsCount()"
justification: "Introducing orphan file count instead orphaned files to reduce\
\ memory overhead."
org.apache.iceberg:iceberg-core:
- code: "java.method.removed"
old: "method java.lang.String[] org.apache.iceberg.hadoop.Util::blockLocations(org.apache.iceberg.CombinedScanTask,\
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ default DeleteOrphanFiles equalAuthorities(Map<String, String> newEqualAuthoriti
interface Result {
/** Returns locations of orphan files. */
Iterable<String> orphanFileLocations();

long orphanFileLocationsCount();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.util;

import java.io.Serializable;
import java.util.function.Consumer;

public interface SerializableConsumer<T> extends Consumer<T>, Serializable {}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,10 @@ private DeleteOrphanFiles.Result doExecute() {
}
}

return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build();
return ImmutableDeleteOrphanFiles.Result.builder()
.orphanFileLocations(orphanFiles)
.orphanFileLocationsCount(0)
.build();
}

private Dataset<FileURI> validFileIdentDS() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.SupportsBulkOperations;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -60,9 +61,11 @@
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SerializableConsumer;
import org.apache.iceberg.util.Tasks;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import org.apache.spark.api.java.function.MapPartitionsFunction;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.Column;
Expand Down Expand Up @@ -121,6 +124,9 @@ public class DeleteOrphanFilesSparkAction extends BaseSparkAction<DeleteOrphanFi
private Dataset<Row> compareToFileList;
private Consumer<String> deleteFunc = null;
private ExecutorService deleteExecutorService = null;
private SetAccumulator<Pair<String, String>> conflicts;
private Dataset<String> orphanFileDF = null;
private boolean includeFilePathsInResults = true;

DeleteOrphanFilesSparkAction(SparkSession spark, Table table) {
super(spark);
Expand Down Expand Up @@ -185,6 +191,10 @@ public DeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFunc) {
return this;
}

public void includeFilePathsInResult(boolean include) {
this.includeFilePathsInResults = include;
}

public DeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
StructType schema = files.schema();

Expand Down Expand Up @@ -233,46 +243,83 @@ private String jobDesc() {
return String.format("Deleting orphan files (%s) from %s", optionsAsString, table.name());
}

private void deleteFiles(SupportsBulkOperations io, List<String> paths) {
try {
io.deleteFiles(paths);
LOG.info("Deleted {} files using bulk deletes", paths.size());
} catch (BulkDeletionFailureException e) {
int deletedFilesCount = paths.size() - e.numberFailedObjects();
LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size());
}
private SerializableConsumer<Iterator<String>> bulkDeleteFiles(SupportsBulkOperations io) {
return stringIterator -> {
List<String> paths = Lists.newArrayList(stringIterator);
try {
io.deleteFiles(paths);
} catch (BulkDeletionFailureException e) {
int deletedFilesCount = paths.size() - e.numberFailedObjects();
LOG.warn("Deleted only {} of {} files using bulk deletes", deletedFilesCount, paths.size());
}
};
}

private DeleteOrphanFiles.Result doExecute() {
private Result doExecute() {
Dataset<FileURI> actualFileIdentDS = actualFileIdentDS();
Dataset<FileURI> validFileIdentDS = validFileIdentDS();
Dataset<String> orphanFiles =
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS).coalesce(10);

orphanFiles.cache();
// materialize the dataframe to populate the accumulator
orphanFiles.count();
SetAccumulator<Pair<String, String>> conflictsAccumulator = conflictsAccumulator();
if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflictsAccumulator.value().isEmpty()) {
throw new ValidationException(
"Unable to determine whether certain files are orphan. "
+ "Metadata references files that match listed/provided files except for authority/scheme. "
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
+ "Conflicting authorities/schemes: %s.",
conflictsAccumulator.value());
}

List<String> orphanFiles =
findOrphanFiles(spark(), actualFileIdentDS, validFileIdentDS, prefixMismatchMode);

if (deleteFunc == null && table.io() instanceof SupportsBulkOperations) {
deleteFiles((SupportsBulkOperations) table.io(), orphanFiles);
FileIO io = table.io();
if (deleteFunc == null && io instanceof SupportsBulkOperations) {
Consumer<Iterator<String>> iteratorConsumer = bulkDeleteFiles((SupportsBulkOperations) io);
orphanFiles.foreachPartition((ForeachPartitionFunction<String>) iteratorConsumer::accept);
} else {

Tasks.Builder<String> deleteTasks =
Tasks.foreach(orphanFiles)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));

if (deleteFunc == null) {
LOG.info(
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
table.io().getClass().getName());
deleteTasks.run(table.io()::deleteFile);
Consumer<String> deleteFunction = nonBulkDeleteFiles();
if (deleteFunction instanceof SerializableConsumer && deleteExecutorService == null) {
orphanFiles.foreachPartition(
(ForeachPartitionFunction<String>)
iterator -> iterator.forEachRemaining(deleteFunction));
} else {
LOG.info("Custom delete function provided. Using non-bulk deletes");
deleteTasks.run(deleteFunc::accept);
List<String> filesToDelete = orphanFiles.collectAsList();
Tasks.Builder<String> deleteTasks =
Tasks.foreach(filesToDelete)
.noRetry()
.executeWith(deleteExecutorService)
.suppressFailureWhenFinished()
.onFailure((file, exc) -> LOG.warn("Failed to delete file: {}", file, exc));
deleteTasks.run(deleteFunction::accept);
}
}
return results(orphanFiles);
}

return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build();
private Consumer<String> nonBulkDeleteFiles() {
FileIO io = table.io();
if (deleteFunc == null) {
LOG.info(
"Table IO {} does not support bulk operations. Using non-bulk deletes.",
io.getClass().getName());
return (SerializableConsumer<String>) io::deleteFile;
} else {
LOG.info("Custom delete function provided. Using non-bulk deletes");
return deleteFunc;
}
}

Result results(Dataset<String> df) {
return ImmutableDeleteOrphanFiles.Result.builder()
.orphanFileLocations(includeFilePathsInResults ? df.collectAsList() : Lists.newArrayList())
.orphanFileLocationsCount(df.count())
.build();
}

private Dataset<FileURI> validFileIdentDS() {
Expand Down Expand Up @@ -387,38 +434,28 @@ private static void listDirRecursively(
}
}

@VisibleForTesting
static List<String> findOrphanFiles(
SparkSession spark,
Dataset<FileURI> actualFileIdentDS,
Dataset<FileURI> validFileIdentDS,
PrefixMismatchMode prefixMismatchMode) {

SetAccumulator<Pair<String, String>> conflicts = new SetAccumulator<>();
spark.sparkContext().register(conflicts);

Dataset<String> findOrphanFiles(
SparkSession spark, Dataset<FileURI> actualFileIdentDS, Dataset<FileURI> validFileIdentDS) {
if (orphanFileDF != null) {
return orphanFileDF;
}
SetAccumulator<Pair<String, String>> conflictsAccumulator = conflictsAccumulator();
spark.sparkContext().register(conflictsAccumulator);
Column joinCond = actualFileIdentDS.col("path").equalTo(validFileIdentDS.col("path"));

List<String> orphanFiles =
orphanFileDF =
actualFileIdentDS
.joinWith(validFileIdentDS, joinCond, "leftouter")
.mapPartitions(new FindOrphanFiles(prefixMismatchMode, conflicts), Encoders.STRING())
.collectAsList();
.mapPartitions(
new FindOrphanFiles(prefixMismatchMode, conflictsAccumulator), Encoders.STRING());
return orphanFileDF;
}

if (prefixMismatchMode == PrefixMismatchMode.ERROR && !conflicts.value().isEmpty()) {
throw new ValidationException(
"Unable to determine whether certain files are orphan. "
+ "Metadata references files that match listed/provided files except for authority/scheme. "
+ "Please, inspect the conflicting authorities/schemes and provide which of them are equal "
+ "by further configuring the action via equalSchemes() and equalAuthorities() methods. "
+ "Set the prefix mismatch mode to 'NONE' to ignore remaining locations with conflicting "
+ "authorities/schemes or to 'DELETE' iff you are ABSOLUTELY confident that remaining conflicting "
+ "authorities/schemes are different. It will be impossible to recover deleted files. "
+ "Conflicting authorities/schemes: %s.",
conflicts.value());
private SetAccumulator<Pair<String, String>> conflictsAccumulator() {
if (conflicts == null) {
conflicts = new SetAccumulator<>();
}

return orphanFiles;
return conflicts;
}

private static Map<String, String> flattenMap(Map<String, String> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -157,8 +158,7 @@ public void testDryRun() throws IOException {

SparkActions actions = SparkActions.get();

DeleteOrphanFiles.Result result1 =
actions.deleteOrphanFiles(table).deleteWith(s -> {}).execute();
DeleteOrphanFiles.Result result1 = actions.deleteOrphanFiles(table).execute();
assertThat(result1.orphanFileLocations())
.as("Default olderThan interval should be safe")
.isEmpty();
Expand Down Expand Up @@ -381,7 +381,7 @@ public void testMetadataFolderIsIntact() {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();

assertThat(result.orphanFileLocations()).as("Should delete 1 file").hasSize(1);
assertThat(result.orphanFileLocationsCount()).isEqualTo(1);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
Expand Down Expand Up @@ -536,7 +536,7 @@ public void testHiddenPartitionPaths() {
DeleteOrphanFiles.Result result =
actions.deleteOrphanFiles(table).olderThan(System.currentTimeMillis()).execute();

assertThat(result.orphanFileLocations()).as("Should delete 2 files").hasSize(2);
assertThat(result.orphanFileLocationsCount()).as("Should delete 2 files").isEqualTo(2);
}

@TestTemplate
Expand Down Expand Up @@ -1002,6 +1002,7 @@ public void testPathsWithEqualSchemes() {
.hasMessageStartingWith("Unable to determine whether certain files are orphan")
.hasMessageEndingWith("Conflicting authorities/schemes: [(scheme1, scheme2)].");

TABLES.dropTable(tableLocation);
Map<String, String> equalSchemes = Maps.newHashMap();
equalSchemes.put("scheme1", "scheme");
equalSchemes.put("scheme2", "scheme");
Expand Down Expand Up @@ -1031,6 +1032,7 @@ public void testPathsWithEqualAuthorities() {
.hasMessageStartingWith("Unable to determine whether certain files are orphan")
.hasMessageEndingWith("Conflicting authorities/schemes: [(servicename1, servicename2)].");

TABLES.dropTable(tableLocation);
Map<String, String> equalAuthorities = Maps.newHashMap();
equalAuthorities.put("servicename1", "servicename");
equalAuthorities.put("servicename2", "servicename");
Expand Down Expand Up @@ -1079,15 +1081,22 @@ private void executeTest(
Map<String, String> equalSchemes,
Map<String, String> equalAuthorities,
DeleteOrphanFiles.PrefixMismatchMode mode) {

StringToFileURI toFileUri = new StringToFileURI(equalSchemes, equalAuthorities);

Dataset<String> validFileDS = spark.createDataset(validFiles, Encoders.STRING());
Dataset<String> actualFileDS = spark.createDataset(actualFiles, Encoders.STRING());
Table table = TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), properties, tableLocation);
DeleteOrphanFilesSparkAction deleteOrphanFilesSparkAction =
SparkActions.get().deleteOrphanFiles(table).prefixMismatchMode(mode);
deleteOrphanFilesSparkAction.findOrphanFiles(
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS));

DeleteOrphanFiles.Result result = deleteOrphanFilesSparkAction.execute();
assertThat(Lists.newArrayList(result.orphanFileLocations())).isEqualTo(expectedOrphanFiles);
}

List<String> orphanFiles =
DeleteOrphanFilesSparkAction.findOrphanFiles(
spark, toFileUri.apply(actualFileDS), toFileUri.apply(validFileDS), mode);
assertThat(orphanFiles).isEqualTo(expectedOrphanFiles);
@AfterEach
public void after() throws IOException {
TABLES.dropTable(tableLocation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,10 @@ private DeleteOrphanFiles.Result doExecute() {
}
}

return ImmutableDeleteOrphanFiles.Result.builder().orphanFileLocations(orphanFiles).build();
return ImmutableDeleteOrphanFiles.Result.builder()
.orphanFileLocations(orphanFiles)
.orphanFileLocationsCount(0)
.build();
}

private Dataset<FileURI> validFileIdentDS() {
Expand Down