Skip to content

Commit 3d77fc7

Browse files
committed
Flink: Add dynamic writer and committer
This adds the dynamic version of the writer and committer for the Flink Dynamic Iceberg Sink. Conceptually, they work similar to the IcebergSink, but they support writing to multiple tables. Write results from each table are aggregated from the DynamicWriter in the DynamicWriteResultAggregator, from where they are sent to the DynamicCommitter. Broken out of apache#12424, depends on apache#13032.
1 parent 924182b commit 3d77fc7

21 files changed

+1412
-47
lines changed

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
package org.apache.iceberg.flink.sink;
2020

2121
import java.util.Arrays;
22+
import java.util.List;
2223
import java.util.NavigableMap;
2324
import java.util.concurrent.atomic.AtomicLong;
2425
import org.apache.iceberg.io.WriteResult;
2526
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
2627
import org.apache.iceberg.util.ScanTaskUtil;
2728

28-
class CommitSummary {
29+
public class CommitSummary {
2930

3031
private final AtomicLong dataFilesCount = new AtomicLong();
3132
private final AtomicLong dataFilesRecordCount = new AtomicLong();
@@ -34,30 +35,35 @@ class CommitSummary {
3435
private final AtomicLong deleteFilesRecordCount = new AtomicLong();
3536
private final AtomicLong deleteFilesByteCount = new AtomicLong();
3637

37-
CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
38-
pendingResults
39-
.values()
38+
public CommitSummary() {}
39+
40+
public CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
41+
pendingResults.values().forEach(this::addWriteResult);
42+
}
43+
44+
public void addAll(NavigableMap<Long, List<WriteResult>> pendingResults) {
45+
pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult));
46+
}
47+
48+
private void addWriteResult(WriteResult writeResult) {
49+
dataFilesCount.addAndGet(writeResult.dataFiles().length);
50+
Arrays.stream(writeResult.dataFiles())
51+
.forEach(
52+
dataFile -> {
53+
dataFilesRecordCount.addAndGet(dataFile.recordCount());
54+
dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes());
55+
});
56+
deleteFilesCount.addAndGet(writeResult.deleteFiles().length);
57+
Arrays.stream(writeResult.deleteFiles())
4058
.forEach(
41-
writeResult -> {
42-
dataFilesCount.addAndGet(writeResult.dataFiles().length);
43-
Arrays.stream(writeResult.dataFiles())
44-
.forEach(
45-
dataFile -> {
46-
dataFilesRecordCount.addAndGet(dataFile.recordCount());
47-
dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes());
48-
});
49-
deleteFilesCount.addAndGet(writeResult.deleteFiles().length);
50-
Arrays.stream(writeResult.deleteFiles())
51-
.forEach(
52-
deleteFile -> {
53-
deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
54-
long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile);
55-
deleteFilesByteCount.addAndGet(deleteBytes);
56-
});
59+
deleteFile -> {
60+
deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
61+
long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile);
62+
deleteFilesByteCount.addAndGet(deleteBytes);
5763
});
5864
}
5965

60-
long dataFilesCount() {
66+
public long dataFilesCount() {
6167
return dataFilesCount.get();
6268
}
6369

@@ -69,7 +75,7 @@ long dataFilesByteCount() {
6975
return dataFilesByteCount.get();
7076
}
7177

72-
long deleteFilesCount() {
78+
public long deleteFilesCount() {
7379
return deleteFilesCount.get();
7480
}
7581

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2424
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2525

26-
class DeltaManifests {
26+
public class DeltaManifests {
2727

2828
private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
2929

@@ -56,7 +56,7 @@ CharSequence[] referencedDataFiles() {
5656
return referencedDataFiles;
5757
}
5858

59-
List<ManifestFile> manifests() {
59+
public List<ManifestFile> manifests() {
6060
List<ManifestFile> manifests = Lists.newArrayListWithCapacity(2);
6161
if (dataManifest != null) {
6262
manifests.add(dataManifest);

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@
2828
import org.apache.iceberg.ManifestFiles;
2929
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3030

31-
class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
31+
public class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
3232
private static final int VERSION_1 = 1;
3333
private static final int VERSION_2 = 2;
3434
private static final byte[] EMPTY_BINARY = new byte[0];
3535

36-
static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
36+
public static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
3737

3838
@Override
3939
public int getVersion() {

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040

41-
class FlinkManifestUtil {
41+
public class FlinkManifestUtil {
4242

4343
private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class);
4444
private static final int FORMAT_V2 = 2;
@@ -66,7 +66,7 @@ static List<DataFile> readDataFiles(
6666
}
6767
}
6868

69-
static ManifestOutputFileFactory createOutputFileFactory(
69+
public static ManifestOutputFileFactory createOutputFileFactory(
7070
Supplier<Table> tableSupplier,
7171
Map<String, String> tableProps,
7272
String flinkJobId,
@@ -83,7 +83,7 @@ static ManifestOutputFileFactory createOutputFileFactory(
8383
* @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
8484
* partition spec
8585
*/
86-
static DeltaManifests writeCompletedFiles(
86+
public static DeltaManifests writeCompletedFiles(
8787
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
8888
throws IOException {
8989

@@ -114,7 +114,7 @@ static DeltaManifests writeCompletedFiles(
114114
return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
115115
}
116116

117-
static WriteResult readCompletedFiles(
117+
public static WriteResult readCompletedFiles(
118118
DeltaManifests deltaManifests, FileIO io, Map<Integer, PartitionSpec> specsById)
119119
throws IOException {
120120
WriteResult.Builder builder = WriteResult.builder();
@@ -135,7 +135,7 @@ static WriteResult readCompletedFiles(
135135
return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build();
136136
}
137137

138-
static void deleteCommittedManifests(
138+
public static void deleteCommittedManifests(
139139
Table table, List<ManifestFile> manifests, String newFlinkJobId, long checkpointId) {
140140
for (ManifestFile manifest : manifests) {
141141
try {

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* <p>{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer
3232
* and the Aggregator operator and between the Aggregator and the Committer as well.
3333
*/
34-
class IcebergCommittable implements Serializable {
34+
public class IcebergCommittable implements Serializable {
3535
private final byte[] manifest;
3636
private final String jobId;
3737
private final String operatorId;

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*
3131
* <p>In both cases only the respective part is serialized.
3232
*/
33-
class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
33+
public class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
3434
private static final int VERSION = 1;
3535

3636
@Override

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import org.apache.flink.metrics.MetricGroup;
2525
import org.apache.iceberg.flink.util.ElapsedTimeGauge;
2626

27-
class IcebergFilesCommitterMetrics {
27+
public class IcebergFilesCommitterMetrics {
2828
private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
2929
private final AtomicLong lastCommitDurationMs = new AtomicLong();
3030
private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit;
@@ -35,7 +35,7 @@ class IcebergFilesCommitterMetrics {
3535
private final Counter committedDeleteFilesRecordCount;
3636
private final Counter committedDeleteFilesByteCount;
3737

38-
IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) {
38+
public IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) {
3939
MetricGroup committerMetrics =
4040
metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName);
4141
committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get);
@@ -52,16 +52,16 @@ class IcebergFilesCommitterMetrics {
5252
this.committedDeleteFilesByteCount = committerMetrics.counter("committedDeleteFilesByteCount");
5353
}
5454

55-
void checkpointDuration(long checkpointDurationMs) {
55+
public void checkpointDuration(long checkpointDurationMs) {
5656
lastCheckpointDurationMs.set(checkpointDurationMs);
5757
}
5858

59-
void commitDuration(long commitDurationMs) {
59+
public void commitDuration(long commitDurationMs) {
6060
lastCommitDurationMs.set(commitDurationMs);
6161
}
6262

6363
/** This is called upon a successful commit. */
64-
void updateCommitSummary(CommitSummary stats) {
64+
public void updateCommitSummary(CommitSummary stats) {
6565
elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
6666
committedDataFilesCount.inc(stats.dataFilesCount());
6767
committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriterMetrics.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.iceberg.io.WriteResult;
2929
import org.apache.iceberg.util.ScanTaskUtil;
3030

31-
class IcebergStreamWriterMetrics {
31+
public class IcebergStreamWriterMetrics {
3232
// 1,024 reservoir size should cost about 8KB, which is quite small.
3333
// It should also produce good accuracy for histogram distribution (like percentiles).
3434
private static final int HISTOGRAM_RESERVOIR_SIZE = 1024;
@@ -40,7 +40,7 @@ class IcebergStreamWriterMetrics {
4040
private final Histogram dataFilesSizeHistogram;
4141
private final Histogram deleteFilesSizeHistogram;
4242

43-
IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) {
43+
public IcebergStreamWriterMetrics(MetricGroup metrics, String fullTableName) {
4444
MetricGroup writerMetrics =
4545
metrics.addGroup("IcebergStreamWriter").addGroup("table", fullTableName);
4646
this.flushedDataFiles = writerMetrics.counter("flushedDataFiles");
@@ -63,7 +63,7 @@ class IcebergStreamWriterMetrics {
6363
new DropwizardHistogramWrapper(dropwizardDeleteFilesSizeHistogram));
6464
}
6565

66-
void updateFlushResult(WriteResult result) {
66+
public void updateFlushResult(WriteResult result) {
6767
flushedDataFiles.inc(result.dataFiles().length);
6868
flushedDeleteFiles.inc(result.deleteFiles().length);
6969
flushedReferencedDataFiles.inc(result.referencedDataFiles().length);
@@ -84,7 +84,7 @@ void updateFlushResult(WriteResult result) {
8484
});
8585
}
8686

87-
void flushDuration(long flushDurationMs) {
87+
public void flushDuration(long flushDurationMs) {
8888
lastFlushDurationMs.set(flushDurationMs);
8989
}
9090
}

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/ManifestOutputFileFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
3131
import org.apache.iceberg.relocated.com.google.common.base.Strings;
3232

33-
class ManifestOutputFileFactory {
33+
public class ManifestOutputFileFactory {
3434
// Users could define their own flink manifests directory by setting this value in table
3535
// properties.
3636
@VisibleForTesting static final String FLINK_MANIFEST_LOCATION = "flink.manifests.location";
@@ -70,7 +70,7 @@ private String generatePath(long checkpointId) {
7070
fileCount.incrementAndGet()));
7171
}
7272

73-
OutputFile create(long checkpointId) {
73+
public OutputFile create(long checkpointId) {
7474
String flinkManifestDir = props.get(FLINK_MANIFEST_LOCATION);
7575
TableOperations ops = ((HasTableOperations) tableSupplier.get()).operations();
7676

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,28 @@ public RowDataTaskWriterFactory(
8080
Map<String, String> writeProperties,
8181
List<Integer> equalityFieldIds,
8282
boolean upsert) {
83+
this(
84+
tableSupplier,
85+
flinkSchema,
86+
targetFileSizeBytes,
87+
format,
88+
writeProperties,
89+
equalityFieldIds,
90+
upsert,
91+
tableSupplier.get().schema(),
92+
tableSupplier.get().spec());
93+
}
94+
95+
public RowDataTaskWriterFactory(
96+
SerializableSupplier<Table> tableSupplier,
97+
RowType flinkSchema,
98+
long targetFileSizeBytes,
99+
FileFormat format,
100+
Map<String, String> writeProperties,
101+
List<Integer> equalityFieldIds,
102+
boolean upsert,
103+
Schema schema,
104+
PartitionSpec spec) {
83105
this.tableSupplier = tableSupplier;
84106

85107
Table table;
@@ -90,9 +112,9 @@ public RowDataTaskWriterFactory(
90112
table = tableSupplier.get();
91113
}
92114

93-
this.schema = table.schema();
115+
this.schema = schema;
94116
this.flinkSchema = flinkSchema;
95-
this.spec = table.spec();
117+
this.spec = spec;
96118
this.targetFileSizeBytes = targetFileSizeBytes;
97119
this.format = format;
98120
this.equalityFieldIds = equalityFieldIds;
@@ -148,6 +170,7 @@ public void initialize(int taskId, int attemptId) {
148170
OutputFileFactory.builderFor(table, taskId, attemptId)
149171
.format(format)
150172
.ioSupplier(() -> tableSupplier.get().io())
173+
.defaultSpec(spec)
151174
.build();
152175
}
153176

flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/WriteResultSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.flink.util.InstantiationUtil;
2727
import org.apache.iceberg.io.WriteResult;
2828

29-
class WriteResultSerializer implements SimpleVersionedSerializer<WriteResult> {
29+
public class WriteResultSerializer implements SimpleVersionedSerializer<WriteResult> {
3030
private static final int VERSION = 1;
3131

3232
@Override

0 commit comments

Comments
 (0)