Skip to content

Commit 09a5317

Browse files
authored
Spark, Flink: Backport add max files rewrite option for RewriteAction (#13082)
backports #12824
1 parent dfa5a97 commit 09a5317

File tree

6 files changed

+36
-3
lines changed

6 files changed

+36
-3
lines changed

docs/docs/spark-procedures.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,7 @@ Dangling deletes are always filtered out during rewriting.
533533
| `min-input-files` | 5 | Any file group exceeding this number of files will be rewritten regardless of other criteria |
534534
| `rewrite-all` | false | Force rewriting of all provided files overriding other options |
535535
| `max-file-group-size-bytes` | 107374182400 (100GB) | Largest amount of data that should be rewritten in a single file group. The entire rewrite operation is broken down into pieces based on partitioning and within partitions based on size into file-groups. This helps with breaking down the rewriting of very large partitions which may not be rewritable otherwise due to the resource constraints of the cluster. |
536+
| `max-files-to-rewrite` | null | This option sets an upper limit on the number of eligible files that will be rewritten. If this option is not specified, all eligible files will be rewritten. |
536537

537538
#### Output
538539

@@ -1055,4 +1056,4 @@ metadata files and data files to the target location.
10551056
Lastly, the [register_table](#register_table) procedure can be used to register the copied table in the target location with a catalog.
10561057
10571058
!!! warning
1058-
Iceberg tables with partition statistics files are not currently supported for path rewrite.
1059+
Iceberg tables with partition statistics files are not currently supported for path rewrite.

flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
170170
return this;
171171
}
172172

173+
/**
174+
* Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
175+
* for more details.
176+
*
177+
* @param maxFilesToRewrite maximum files to rewrite
178+
*/
179+
public Builder maxFilesToRewrite(int maxFilesToRewrite) {
180+
this.rewriteOptions.put(
181+
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite));
182+
return this;
183+
}
184+
173185
/**
174186
* The input is a {@link DataStream} with {@link Trigger} events and every event should be
175187
* immediately followed by a {@link Watermark} with the same timestamp as the event.

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,12 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
170170
return this;
171171
}
172172

173+
/**
174+
* Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
175+
* for more details.
176+
*
177+
* @param maxFilesToRewrite maximum files to rewrite
178+
*/
173179
public Builder maxFilesToRewrite(int maxFilesToRewrite) {
174180
this.rewriteOptions.put(
175181
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite));

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/maintenance/api/RewriteDataFiles.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,18 @@ public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
170170
return this;
171171
}
172172

173+
/**
174+
* Configures max files to rewrite. See {@link BinPackRewriteFilePlanner#MAX_FILES_TO_REWRITE}
175+
* for more details.
176+
*
177+
* @param maxFilesToRewrite maximum files to rewrite
178+
*/
179+
public Builder maxFilesToRewrite(int maxFilesToRewrite) {
180+
this.rewriteOptions.put(
181+
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE, String.valueOf(maxFilesToRewrite));
182+
return this;
183+
}
184+
173185
/**
174186
* The input is a {@link DataStream} with {@link Trigger} events and every event should be
175187
* immediately followed by a {@link Watermark} with the same timestamp as the event.

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction
7878
USE_STARTING_SEQUENCE_NUMBER,
7979
REWRITE_JOB_ORDER,
8080
OUTPUT_SPEC_ID,
81-
REMOVE_DANGLING_DELETES);
81+
REMOVE_DANGLING_DELETES,
82+
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE);
8283

8384
private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
8485
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();

spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ public class RewriteDataFilesSparkAction
7878
USE_STARTING_SEQUENCE_NUMBER,
7979
REWRITE_JOB_ORDER,
8080
OUTPUT_SPEC_ID,
81-
REMOVE_DANGLING_DELETES);
81+
REMOVE_DANGLING_DELETES,
82+
BinPackRewriteFilePlanner.MAX_FILES_TO_REWRITE);
8283

8384
private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
8485
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();

0 commit comments

Comments
 (0)