diff --git a/docs/docs/spark-procedures.md b/docs/docs/spark-procedures.md index 026d256199a7..c1205d513aee 100644 --- a/docs/docs/spark-procedures.md +++ b/docs/docs/spark-procedures.md @@ -533,6 +533,7 @@ Dangling deletes are always filtered out during rewriting. | `min-input-files` | 5 | Any file group exceeding this number of files will be rewritten regardless of other criteria | | `rewrite-all` | false | Force rewriting of all provided files overriding other options | | `max-file-group-size-bytes` | 107374182400 (100GB) | Largest amount of data that should be rewritten in a single file group. The entire rewrite operation is broken down into pieces based on partitioning and within partitions based on size into file-groups. This helps with breaking down the rewriting of very large partitions which may not be rewritable otherwise due to the resource constraints of the cluster. | +| `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. | #### Output @@ -1055,4 +1056,4 @@ metadata files and data files to the target location. Lastly, the [register_table](#register_table) procedure can be used to register the copied table in the target location with a catalog. !!! warning - Iceberg tables with partition statistics files are not currently supported for path rewrite. \ No newline at end of file + Iceberg tables with partition statistics files are not currently supported for path rewrite. diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index b96635e6abc1..3668e8a6c6f9 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -170,6 +170,18 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) { return this; } + /** + * Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE} + * for more details. + * + * @param maxFilesToRewrite maximum files to rewrite + */ + public Builder maxFilesToRewrite(int maxFilesToRewrite) { + this.rewriteOptions.put( + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite)); + return this; + } + /** * The input is a {@link DataStream} with {@link Trigger} events and every event should be * immediately followed by a {@link Watermark} with the same timestamp as the event. diff --git a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index 5b1c2c8eeee9..3668e8a6c6f9 100644 --- a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -170,6 +170,12 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) { return this; } + /** + * Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE} + * for more details. + * + * @param maxFilesToRewrite maximum files to rewrite + */ public Builder maxFilesToRewrite(int maxFilesToRewrite) { this.rewriteOptions.put( BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite)); diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java index b96635e6abc1..3668e8a6c6f9 100644 --- a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java @@ -170,6 +170,18 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) { return this; } + /** + * Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE} + * for more details. + * + * @param maxFilesToRewrite maximum files to rewrite + */ + public Builder maxFilesToRewrite(int maxFilesToRewrite) { + this.rewriteOptions.put( + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite)); + return this; + } + /** * The input is a {@link DataStream} with {@link Trigger} events and every event should be * immediately followed by a {@link Watermark} with the same timestamp as the event. diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index df53c1ff1da9..48a213f878cd 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER, OUTPUT_SPEC_ID, - REMOVE_DANGLING_DELETES); + REMOVE_DANGLING_DELETES, + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE); private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build(); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 775d260ece8e..ffee1fec132f 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction USE_STARTING_SEQUENCE_NUMBER, REWRITE_JOB_ORDER, OUTPUT_SPEC_ID, - REMOVE_DANGLING_DELETES); + REMOVE_DANGLING_DELETES, + BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE); private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT = ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();