Skip to content

Commit eb5ba7e

Browse files
mxmpvary
andcommitted
Flink 1.20: Dynamic Iceberg Sink
Co-authored-by: Peter Vary <[email protected]>
1 parent c056915 commit eb5ba7e

File tree

65 files changed

+7145
-67
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+7145
-67
lines changed

flink/v1.20/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
6868

6969
implementation libs.datasketches
7070

71+
// for caching in DynamicSink
72+
implementation libs.caffeine
73+
7174
testImplementation libs.flink120.connector.test.utils
7275
testImplementation libs.flink120.core
7376
testImplementation libs.flink120.runtime
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.sink.dynamic;
20+
21+
import java.io.File;
22+
import java.io.IOException;
23+
import java.nio.file.Files;
24+
import java.util.List;
25+
import java.util.stream.Collectors;
26+
import org.apache.flink.api.common.serialization.SerializerConfig;
27+
import org.apache.flink.api.common.typeutils.TypeSerializer;
28+
import org.apache.flink.core.memory.DataInputDeserializer;
29+
import org.apache.flink.core.memory.DataOutputSerializer;
30+
import org.apache.hadoop.conf.Configuration;
31+
import org.apache.iceberg.CatalogProperties;
32+
import org.apache.iceberg.PartitionSpec;
33+
import org.apache.iceberg.Schema;
34+
import org.apache.iceberg.data.RandomGenericData;
35+
import org.apache.iceberg.data.Record;
36+
import org.apache.iceberg.flink.CatalogLoader;
37+
import org.apache.iceberg.flink.RowDataConverter;
38+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
39+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
40+
import org.apache.iceberg.types.Types;
41+
import org.openjdk.jmh.annotations.Benchmark;
42+
import org.openjdk.jmh.annotations.BenchmarkMode;
43+
import org.openjdk.jmh.annotations.Fork;
44+
import org.openjdk.jmh.annotations.Measurement;
45+
import org.openjdk.jmh.annotations.Mode;
46+
import org.openjdk.jmh.annotations.Scope;
47+
import org.openjdk.jmh.annotations.Setup;
48+
import org.openjdk.jmh.annotations.State;
49+
import org.openjdk.jmh.annotations.Threads;
50+
import org.openjdk.jmh.annotations.Warmup;
51+
import org.openjdk.jmh.infra.Blackhole;
52+
import org.openjdk.jmh.runner.Runner;
53+
import org.openjdk.jmh.runner.RunnerException;
54+
import org.openjdk.jmh.runner.options.Options;
55+
import org.openjdk.jmh.runner.options.OptionsBuilder;
56+
57+
@Fork(1)
58+
@State(Scope.Benchmark)
59+
@Warmup(iterations = 3)
60+
@Measurement(iterations = 5)
61+
@BenchmarkMode(Mode.SingleShotTime)
62+
public class DynamicRecordSerializerDeserializerBenchmark {
63+
private static final int SAMPLE_SIZE = 100_000;
64+
private static final Schema SCHEMA =
65+
new Schema(
66+
Types.NestedField.required(1, "id", Types.IntegerType.get()),
67+
Types.NestedField.required(2, "name2", Types.StringType.get()),
68+
Types.NestedField.required(3, "name3", Types.StringType.get()),
69+
Types.NestedField.required(4, "name4", Types.StringType.get()),
70+
Types.NestedField.required(5, "name5", Types.StringType.get()),
71+
Types.NestedField.required(6, "name6", Types.StringType.get()),
72+
Types.NestedField.required(7, "name7", Types.StringType.get()),
73+
Types.NestedField.required(8, "name8", Types.StringType.get()),
74+
Types.NestedField.required(9, "name9", Types.StringType.get()));
75+
76+
private List<DynamicRecordInternal> rows = Lists.newArrayListWithExpectedSize(SAMPLE_SIZE);
77+
private DynamicRecordInternalType type;
78+
79+
public static void main(String[] args) throws RunnerException {
80+
Options options =
81+
new OptionsBuilder()
82+
.include(DynamicRecordSerializerDeserializerBenchmark.class.getSimpleName())
83+
.build();
84+
new Runner(options).run();
85+
}
86+
87+
@Setup
88+
public void setupBenchmark() throws IOException {
89+
List<Record> records = RandomGenericData.generate(SCHEMA, SAMPLE_SIZE, 1L);
90+
this.rows =
91+
records.stream()
92+
.map(
93+
r ->
94+
new DynamicRecordInternal(
95+
"t",
96+
"main",
97+
SCHEMA,
98+
PartitionSpec.unpartitioned(),
99+
1,
100+
RowDataConverter.convert(SCHEMA, r),
101+
false,
102+
List.of()))
103+
.collect(Collectors.toList());
104+
105+
File warehouse = Files.createTempFile("perf-bench", null).toFile();
106+
CatalogLoader catalogLoader =
107+
CatalogLoader.hadoop(
108+
"hadoop",
109+
new Configuration(),
110+
ImmutableMap.of(CatalogProperties.WAREHOUSE_LOCATION, warehouse.getPath()));
111+
this.type = new DynamicRecordInternalType(catalogLoader, true, 100);
112+
}
113+
114+
@Benchmark
115+
@Threads(1)
116+
public void testSerialize(Blackhole blackhole) throws IOException {
117+
TypeSerializer<DynamicRecordInternal> serializer =
118+
type.createSerializer((SerializerConfig) null);
119+
DataOutputSerializer outputView = new DataOutputSerializer(1024);
120+
for (int i = 0; i < SAMPLE_SIZE; ++i) {
121+
serializer.serialize(rows.get(i), outputView);
122+
}
123+
}
124+
125+
@Benchmark
126+
@Threads(1)
127+
public void testSerializeAndDeserialize(Blackhole blackhole) throws IOException {
128+
TypeSerializer<DynamicRecordInternal> serializer =
129+
type.createSerializer((SerializerConfig) null);
130+
131+
DataOutputSerializer outputView = new DataOutputSerializer(1024);
132+
for (int i = 0; i < SAMPLE_SIZE; ++i) {
133+
serializer.serialize(rows.get(i), outputView);
134+
serializer.deserialize(new DataInputDeserializer(outputView.getSharedBuffer()));
135+
}
136+
}
137+
}

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkConfParser.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.util.TimeUtils;
2828
import org.apache.iceberg.Table;
2929
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
30+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3031
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3132

3233
class FlinkConfParser {
@@ -41,6 +42,12 @@ class FlinkConfParser {
4142
this.readableConfig = readableConfig;
4243
}
4344

45+
FlinkConfParser(Map<String, String> options, ReadableConfig readableConfig) {
46+
this.tableProperties = ImmutableMap.of();
47+
this.options = options;
48+
this.readableConfig = readableConfig;
49+
}
50+
4451
public BooleanConfParser booleanConf() {
4552
return new BooleanConfParser();
4653
}

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/FlinkWriteConf.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@ public FlinkWriteConf(
5555
this.confParser = new FlinkConfParser(table, writeOptions, readableConfig);
5656
}
5757

58+
public FlinkWriteConf(Map<String, String> writeOptions, ReadableConfig readableConfig) {
59+
this.confParser = new FlinkConfParser(writeOptions, readableConfig);
60+
}
61+
5862
public boolean overwriteMode() {
5963
return confParser
6064
.booleanConf()

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/BucketPartitionerUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2727
import org.apache.iceberg.transforms.PartitionSpecVisitor;
2828

29-
final class BucketPartitionerUtil {
29+
public final class BucketPartitionerUtil {
3030
static final String BAD_NUMBER_OF_BUCKETS_ERROR_MESSAGE =
3131
"Invalid number of buckets: %s (must be 1)";
3232

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@
1919
package org.apache.iceberg.flink.sink;
2020

2121
import java.util.Arrays;
22+
import java.util.List;
2223
import java.util.NavigableMap;
2324
import java.util.concurrent.atomic.AtomicLong;
2425
import org.apache.iceberg.io.WriteResult;
2526
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
2627
import org.apache.iceberg.util.ScanTaskUtil;
2728

28-
class CommitSummary {
29+
public class CommitSummary {
2930

3031
private final AtomicLong dataFilesCount = new AtomicLong();
3132
private final AtomicLong dataFilesRecordCount = new AtomicLong();
@@ -34,30 +35,35 @@ class CommitSummary {
3435
private final AtomicLong deleteFilesRecordCount = new AtomicLong();
3536
private final AtomicLong deleteFilesByteCount = new AtomicLong();
3637

37-
CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
38-
pendingResults
39-
.values()
38+
public CommitSummary() {}
39+
40+
public CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
41+
pendingResults.values().forEach(this::addWriteResult);
42+
}
43+
44+
public void addAll(NavigableMap<Long, List<WriteResult>> pendingResults) {
45+
pendingResults.values().forEach(writeResults -> writeResults.forEach(this::addWriteResult));
46+
}
47+
48+
private void addWriteResult(WriteResult writeResult) {
49+
dataFilesCount.addAndGet(writeResult.dataFiles().length);
50+
Arrays.stream(writeResult.dataFiles())
51+
.forEach(
52+
dataFile -> {
53+
dataFilesRecordCount.addAndGet(dataFile.recordCount());
54+
dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes());
55+
});
56+
deleteFilesCount.addAndGet(writeResult.deleteFiles().length);
57+
Arrays.stream(writeResult.deleteFiles())
4058
.forEach(
41-
writeResult -> {
42-
dataFilesCount.addAndGet(writeResult.dataFiles().length);
43-
Arrays.stream(writeResult.dataFiles())
44-
.forEach(
45-
dataFile -> {
46-
dataFilesRecordCount.addAndGet(dataFile.recordCount());
47-
dataFilesByteCount.addAndGet(dataFile.fileSizeInBytes());
48-
});
49-
deleteFilesCount.addAndGet(writeResult.deleteFiles().length);
50-
Arrays.stream(writeResult.deleteFiles())
51-
.forEach(
52-
deleteFile -> {
53-
deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
54-
long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile);
55-
deleteFilesByteCount.addAndGet(deleteBytes);
56-
});
59+
deleteFile -> {
60+
deleteFilesRecordCount.addAndGet(deleteFile.recordCount());
61+
long deleteBytes = ScanTaskUtil.contentSizeInBytes(deleteFile);
62+
deleteFilesByteCount.addAndGet(deleteBytes);
5763
});
5864
}
5965

60-
long dataFilesCount() {
66+
public long dataFilesCount() {
6167
return dataFilesCount.get();
6268
}
6369

@@ -69,7 +75,7 @@ long dataFilesByteCount() {
6975
return dataFilesByteCount.get();
7076
}
7177

72-
long deleteFilesCount() {
78+
public long deleteFilesCount() {
7379
return deleteFilesCount.get();
7480
}
7581

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2424
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
2525

26-
class DeltaManifests {
26+
public class DeltaManifests {
2727

2828
private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];
2929

@@ -56,7 +56,7 @@ CharSequence[] referencedDataFiles() {
5656
return referencedDataFiles;
5757
}
5858

59-
List<ManifestFile> manifests() {
59+
public List<ManifestFile> manifests() {
6060
List<ManifestFile> manifests = Lists.newArrayListWithCapacity(2);
6161
if (dataManifest != null) {
6262
manifests.add(dataManifest);

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@
2828
import org.apache.iceberg.ManifestFiles;
2929
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
3030

31-
class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
31+
public class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
3232
private static final int VERSION_1 = 1;
3333
private static final int VERSION_2 = 2;
3434
private static final byte[] EMPTY_BINARY = new byte[0];
3535

36-
static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
36+
public static final DeltaManifestsSerializer INSTANCE = new DeltaManifestsSerializer();
3737

3838
@Override
3939
public int getVersion() {

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/EqualityFieldKeySelector.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.iceberg.flink.sink;
2020

2121
import java.util.List;
22-
import org.apache.flink.api.java.functions.KeySelector;
2322
import org.apache.flink.table.data.RowData;
2423
import org.apache.flink.table.types.logical.RowType;
2524
import org.apache.iceberg.Schema;
@@ -30,10 +29,10 @@
3029
import org.apache.iceberg.util.StructProjection;
3130

3231
/**
33-
* Create a {@link KeySelector} to shuffle by equality fields, to ensure same equality fields record
34-
* will be emitted to same writer in order.
32+
* Create a {@link NonThrowingKeySelector} to shuffle by equality fields, to ensure same equality
33+
* fields record will be emitted to same writer in order.
3534
*/
36-
class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
35+
public class EqualityFieldKeySelector implements NonThrowingKeySelector<RowData, Integer> {
3736

3837
private final Schema schema;
3938
private final RowType flinkSchema;
@@ -43,7 +42,8 @@ class EqualityFieldKeySelector implements KeySelector<RowData, Integer> {
4342
private transient StructProjection structProjection;
4443
private transient StructLikeWrapper structLikeWrapper;
4544

46-
EqualityFieldKeySelector(Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
45+
public EqualityFieldKeySelector(
46+
Schema schema, RowType flinkSchema, List<Integer> equalityFieldIds) {
4747
this.schema = schema;
4848
this.flinkSchema = flinkSchema;
4949
this.deleteSchema = TypeUtil.select(schema, Sets.newHashSet(equalityFieldIds));

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.slf4j.Logger;
3939
import org.slf4j.LoggerFactory;
4040

41-
class FlinkManifestUtil {
41+
public class FlinkManifestUtil {
4242

4343
private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class);
4444
private static final int FORMAT_V2 = 2;
@@ -66,7 +66,7 @@ static List<DataFile> readDataFiles(
6666
}
6767
}
6868

69-
static ManifestOutputFileFactory createOutputFileFactory(
69+
public static ManifestOutputFileFactory createOutputFileFactory(
7070
Supplier<Table> tableSupplier,
7171
Map<String, String> tableProps,
7272
String flinkJobId,
@@ -83,7 +83,7 @@ static ManifestOutputFileFactory createOutputFileFactory(
8383
* @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
8484
* partition spec
8585
*/
86-
static DeltaManifests writeCompletedFiles(
86+
public static DeltaManifests writeCompletedFiles(
8787
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
8888
throws IOException {
8989

@@ -114,7 +114,7 @@ static DeltaManifests writeCompletedFiles(
114114
return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
115115
}
116116

117-
static WriteResult readCompletedFiles(
117+
public static WriteResult readCompletedFiles(
118118
DeltaManifests deltaManifests, FileIO io, Map<Integer, PartitionSpec> specsById)
119119
throws IOException {
120120
WriteResult.Builder builder = WriteResult.builder();
@@ -135,7 +135,7 @@ static WriteResult readCompletedFiles(
135135
return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build();
136136
}
137137

138-
static void deleteCommittedManifests(
138+
public static void deleteCommittedManifests(
139139
Table table, List<ManifestFile> manifests, String newFlinkJobId, long checkpointId) {
140140
for (ManifestFile manifest : manifests) {
141141
try {

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
* <p>{@link IcebergCommittableSerializer} is used for serializing the objects between the Writer
3232
* and the Aggregator operator and between the Aggregator and the Committer as well.
3333
*/
34-
class IcebergCommittable implements Serializable {
34+
public class IcebergCommittable implements Serializable {
3535
private final byte[] manifest;
3636
private final String jobId;
3737
private final String operatorId;

flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommittableSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
*
3131
* <p>In both cases only the respective part is serialized.
3232
*/
33-
class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
33+
public class IcebergCommittableSerializer implements SimpleVersionedSerializer<IcebergCommittable> {
3434
private static final int VERSION = 1;
3535

3636
@Override

0 commit comments

Comments
 (0)