Skip to content

Commit ec7d036

Browse files
committed
Flink: Add DynamicRecord / DynamicRecordInternal / DynamicRecordInternalSerializer
This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/pull/12424.
1 parent 951ba96 commit ec7d036

10 files changed

+1102
-0
lines changed

flink/v2.0/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
6868

6969
implementation libs.datasketches
7070

71+
// for caching in DynamicSink
72+
implementation libs.caffeine
73+
7174
testImplementation libs.flink20.connector.test.utils
7275
testImplementation libs.flink20.core
7376
testImplementation libs.flink20.runtime
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.sink.dynamic;
20+
21+
import java.util.List;
22+
import javax.annotation.Nullable;
23+
import org.apache.flink.table.data.RowData;
24+
import org.apache.iceberg.DistributionMode;
25+
import org.apache.iceberg.PartitionSpec;
26+
import org.apache.iceberg.Schema;
27+
import org.apache.iceberg.catalog.TableIdentifier;
28+
29+
/** A DynamicRecord contains RowData alongside with the Iceberg table metadata. */
30+
public class DynamicRecord {
31+
32+
private TableIdentifier tableIdentifier;
33+
private String branch;
34+
private Schema schema;
35+
private RowData rowData;
36+
private PartitionSpec partitionSpec;
37+
private DistributionMode distributionMode;
38+
private int writeParallelism;
39+
private boolean upsertMode;
40+
@Nullable private List<String> equalityFields;
41+
42+
public DynamicRecord(
43+
TableIdentifier tableIdentifier,
44+
String branch,
45+
Schema schema,
46+
RowData rowData,
47+
PartitionSpec partitionSpec,
48+
DistributionMode distributionMode,
49+
int writeParallelism) {
50+
this.tableIdentifier = tableIdentifier;
51+
this.branch = branch;
52+
this.schema = schema;
53+
this.partitionSpec = partitionSpec;
54+
this.rowData = rowData;
55+
this.distributionMode = distributionMode;
56+
this.writeParallelism = writeParallelism;
57+
}
58+
59+
public TableIdentifier tableIdentifier() {
60+
return tableIdentifier;
61+
}
62+
63+
public void setTableIdentifier(TableIdentifier tableIdentifier) {
64+
this.tableIdentifier = tableIdentifier;
65+
}
66+
67+
public String branch() {
68+
return branch;
69+
}
70+
71+
public void setBranch(String branch) {
72+
this.branch = branch;
73+
}
74+
75+
public Schema schema() {
76+
return schema;
77+
}
78+
79+
public void setSchema(Schema schema) {
80+
this.schema = schema;
81+
}
82+
83+
public PartitionSpec spec() {
84+
return partitionSpec;
85+
}
86+
87+
public void setPartitionSpec(PartitionSpec partitionSpec) {
88+
this.partitionSpec = partitionSpec;
89+
}
90+
91+
public RowData rowData() {
92+
return rowData;
93+
}
94+
95+
public void setRowData(RowData rowData) {
96+
this.rowData = rowData;
97+
}
98+
99+
public DistributionMode distributionMode() {
100+
return distributionMode;
101+
}
102+
103+
public void setDistributionMode(DistributionMode distributionMode) {
104+
this.distributionMode = distributionMode;
105+
}
106+
107+
public int writeParallelism() {
108+
return writeParallelism;
109+
}
110+
111+
public void writeParallelism(int parallelism) {
112+
this.writeParallelism = parallelism;
113+
}
114+
115+
public boolean upsertMode() {
116+
return upsertMode;
117+
}
118+
119+
public void setUpsertMode(boolean upsertMode) {
120+
this.upsertMode = upsertMode;
121+
}
122+
123+
public List<String> equalityFields() {
124+
return equalityFields;
125+
}
126+
127+
public void setEqualityFields(List<String> equalityFields) {
128+
this.equalityFields = equalityFields;
129+
}
130+
}
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.sink.dynamic;
20+
21+
import java.util.List;
22+
import java.util.Objects;
23+
import org.apache.flink.annotation.Internal;
24+
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
26+
import org.apache.iceberg.PartitionSpec;
27+
import org.apache.iceberg.Schema;
28+
import org.apache.iceberg.flink.FlinkSchemaUtil;
29+
30+
@Internal
31+
class DynamicRecordInternal {
32+
33+
private String tableName;
34+
private String branch;
35+
private Schema schema;
36+
private PartitionSpec spec;
37+
private int writerKey;
38+
private RowData rowData;
39+
private boolean upsertMode;
40+
private List<Integer> equalityFieldIds;
41+
42+
// Required for serialization instantiation
43+
DynamicRecordInternal() {}
44+
45+
DynamicRecordInternal(
46+
String tableName,
47+
String branch,
48+
Schema schema,
49+
RowData rowData,
50+
PartitionSpec spec,
51+
int writerKey,
52+
boolean upsertMode,
53+
List<Integer> equalityFieldsIds) {
54+
this.tableName = tableName;
55+
this.branch = branch;
56+
this.schema = schema;
57+
this.spec = spec;
58+
this.writerKey = writerKey;
59+
this.rowData = rowData;
60+
this.upsertMode = upsertMode;
61+
this.equalityFieldIds = equalityFieldsIds;
62+
}
63+
64+
public String tableName() {
65+
return tableName;
66+
}
67+
68+
public void setTableName(String tableName) {
69+
this.tableName = tableName;
70+
}
71+
72+
public String branch() {
73+
return branch;
74+
}
75+
76+
public void setBranch(String branch) {
77+
this.branch = branch;
78+
}
79+
80+
public Schema schema() {
81+
return schema;
82+
}
83+
84+
public void setSchema(Schema schema) {
85+
this.schema = schema;
86+
}
87+
88+
public RowData rowData() {
89+
return rowData;
90+
}
91+
92+
public void setRowData(RowData rowData) {
93+
this.rowData = rowData;
94+
}
95+
96+
public PartitionSpec spec() {
97+
return spec;
98+
}
99+
100+
public void setSpec(PartitionSpec spec) {
101+
this.spec = spec;
102+
}
103+
104+
public int writerKey() {
105+
return writerKey;
106+
}
107+
108+
public void setWriterKey(int writerKey) {
109+
this.writerKey = writerKey;
110+
}
111+
112+
public boolean upsertMode() {
113+
return upsertMode;
114+
}
115+
116+
public void setUpsertMode(boolean upsertMode) {
117+
this.upsertMode = upsertMode;
118+
}
119+
120+
public List<Integer> equalityFields() {
121+
return equalityFieldIds;
122+
}
123+
124+
public void setEqualityFieldIds(List<Integer> equalityFieldIds) {
125+
this.equalityFieldIds = equalityFieldIds;
126+
}
127+
128+
@Override
129+
public int hashCode() {
130+
return Objects.hash(
131+
tableName, branch, schema, spec, writerKey, rowData, upsertMode, equalityFieldIds);
132+
}
133+
134+
@Override
135+
public boolean equals(Object other) {
136+
if (this == other) {
137+
return true;
138+
}
139+
140+
if (other == null || getClass() != other.getClass()) {
141+
return false;
142+
}
143+
144+
DynamicRecordInternal that = (DynamicRecordInternal) other;
145+
boolean tableFieldsMatch =
146+
Objects.equals(tableName, that.tableName)
147+
&& Objects.equals(branch, that.branch)
148+
&& schema.schemaId() == that.schema.schemaId()
149+
&& Objects.equals(spec, that.spec)
150+
&& writerKey == that.writerKey
151+
&& upsertMode == that.upsertMode
152+
&& Objects.equals(equalityFieldIds, that.equalityFieldIds);
153+
if (!tableFieldsMatch) {
154+
return false;
155+
}
156+
157+
if (rowData.getClass().equals(that.rowData.getClass())) {
158+
return Objects.equals(rowData, that.rowData);
159+
} else {
160+
RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema));
161+
return rowDataSerializer
162+
.toBinaryRow(rowData)
163+
.equals(rowDataSerializer.toBinaryRow(that.rowData));
164+
}
165+
}
166+
}

0 commit comments

Comments
 (0)