From ec7d036174a65ac96934dfde30a883e1aa7145ed Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 7 May 2025 11:10:34 +0200 Subject: [PATCH] Flink: Add DynamicRecord / DynamicRecordInternal / DynamicRecordInternalSerializer This adds the user-facing type DynamicRecord, alongside with its internal representation DynamicRecordInternal and its type information and serializer. Broken out of github.com/apache/iceberg/pull/12424. --- flink/v2.0/build.gradle | 3 + .../flink/sink/dynamic/DynamicRecord.java | 130 ++++++++ .../sink/dynamic/DynamicRecordInternal.java | 166 ++++++++++ .../DynamicRecordInternalSerializer.java | 296 ++++++++++++++++++ .../dynamic/DynamicRecordInternalType.java | 96 ++++++ .../sink/dynamic/TableSerializerCache.java | 135 ++++++++ ...namicRecordInternalSerializerTestBase.java | 96 ++++++ ...icRecordInternalSerializerWriteSchema.java | 28 ++ ...RecordInternalSerializerWriteSchemaId.java | 28 ++ .../dynamic/TestTableSerializerCache.java | 124 ++++++++ 10 files changed, 1102 insertions(+) create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalType.java create mode 100644 flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java create mode 100644 flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java diff --git a/flink/v2.0/build.gradle b/flink/v2.0/build.gradle index 05d9fb44023f..fbe206f1b3ca 100644 --- a/flink/v2.0/build.gradle +++ b/flink/v2.0/build.gradle @@ -68,6 +68,9 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") { implementation libs.datasketches + // for caching in DynamicSink + implementation libs.caffeine + testImplementation libs.flink20.connector.test.utils testImplementation libs.flink20.core testImplementation libs.flink20.runtime diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java new file mode 100644 index 000000000000..994f7a0865f6 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import javax.annotation.Nullable; +import org.apache.flink.table.data.RowData; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; + +/** A DynamicRecord contains RowData alongside with the Iceberg table metadata. */ +public class DynamicRecord { + + private TableIdentifier tableIdentifier; + private String branch; + private Schema schema; + private RowData rowData; + private PartitionSpec partitionSpec; + private DistributionMode distributionMode; + private int writeParallelism; + private boolean upsertMode; + @Nullable private List equalityFields; + + public DynamicRecord( + TableIdentifier tableIdentifier, + String branch, + Schema schema, + RowData rowData, + PartitionSpec partitionSpec, + DistributionMode distributionMode, + int writeParallelism) { + this.tableIdentifier = tableIdentifier; + this.branch = branch; + this.schema = schema; + this.partitionSpec = partitionSpec; + this.rowData = rowData; + this.distributionMode = distributionMode; + this.writeParallelism = writeParallelism; + } + + public TableIdentifier tableIdentifier() { + return tableIdentifier; + } + + public void setTableIdentifier(TableIdentifier tableIdentifier) { + this.tableIdentifier = tableIdentifier; + } + + public String branch() { + return branch; + } + + public void setBranch(String branch) { + this.branch = branch; + } + + public Schema schema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + + public PartitionSpec spec() { + return partitionSpec; + } + + public void setPartitionSpec(PartitionSpec partitionSpec) { + this.partitionSpec = partitionSpec; + } + + public RowData rowData() { + return rowData; + } + + public void setRowData(RowData rowData) { + this.rowData = rowData; + } + + public DistributionMode distributionMode() { + return distributionMode; + } + + public void setDistributionMode(DistributionMode distributionMode) { + this.distributionMode = distributionMode; + } + + public int writeParallelism() { + return writeParallelism; + } + + public void writeParallelism(int parallelism) { + this.writeParallelism = parallelism; + } + + public boolean upsertMode() { + return upsertMode; + } + + public void setUpsertMode(boolean upsertMode) { + this.upsertMode = upsertMode; + } + + public List equalityFields() { + return equalityFields; + } + + public void setEqualityFields(List equalityFields) { + this.equalityFields = equalityFields; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java new file mode 100644 index 000000000000..958a1e853983 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternal.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.util.List; +import java.util.Objects; +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.flink.FlinkSchemaUtil; + +@Internal +class DynamicRecordInternal { + + private String tableName; + private String branch; + private Schema schema; + private PartitionSpec spec; + private int writerKey; + private RowData rowData; + private boolean upsertMode; + private List equalityFieldIds; + + // Required for serialization instantiation + DynamicRecordInternal() {} + + DynamicRecordInternal( + String tableName, + String branch, + Schema schema, + RowData rowData, + PartitionSpec spec, + int writerKey, + boolean upsertMode, + List equalityFieldsIds) { + this.tableName = tableName; + this.branch = branch; + this.schema = schema; + this.spec = spec; + this.writerKey = writerKey; + this.rowData = rowData; + this.upsertMode = upsertMode; + this.equalityFieldIds = equalityFieldsIds; + } + + public String tableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public String branch() { + return branch; + } + + public void setBranch(String branch) { + this.branch = branch; + } + + public Schema schema() { + return schema; + } + + public void setSchema(Schema schema) { + this.schema = schema; + } + + public RowData rowData() { + return rowData; + } + + public void setRowData(RowData rowData) { + this.rowData = rowData; + } + + public PartitionSpec spec() { + return spec; + } + + public void setSpec(PartitionSpec spec) { + this.spec = spec; + } + + public int writerKey() { + return writerKey; + } + + public void setWriterKey(int writerKey) { + this.writerKey = writerKey; + } + + public boolean upsertMode() { + return upsertMode; + } + + public void setUpsertMode(boolean upsertMode) { + this.upsertMode = upsertMode; + } + + public List equalityFields() { + return equalityFieldIds; + } + + public void setEqualityFieldIds(List equalityFieldIds) { + this.equalityFieldIds = equalityFieldIds; + } + + @Override + public int hashCode() { + return Objects.hash( + tableName, branch, schema, spec, writerKey, rowData, upsertMode, equalityFieldIds); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + DynamicRecordInternal that = (DynamicRecordInternal) other; + boolean tableFieldsMatch = + Objects.equals(tableName, that.tableName) + && Objects.equals(branch, that.branch) + && schema.schemaId() == that.schema.schemaId() + && Objects.equals(spec, that.spec) + && writerKey == that.writerKey + && upsertMode == that.upsertMode + && Objects.equals(equalityFieldIds, that.equalityFieldIds); + if (!tableFieldsMatch) { + return false; + } + + if (rowData.getClass().equals(that.rowData.getClass())) { + return Objects.equals(rowData, that.rowData); + } else { + RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema)); + return rowDataSerializer + .toBinaryRow(rowData) + .equals(rowDataSerializer.toBinaryRow(that.rowData)); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java new file mode 100644 index 000000000000..a6f38f7d6bc9 --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializer.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.PartitionSpecParser; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +@Internal +class DynamicRecordInternalSerializer extends TypeSerializer { + + private static final long serialVersionUID = 1L; + + private final TableSerializerCache serializerCache; + private final boolean writeSchemaAndSpec; + + DynamicRecordInternalSerializer( + TableSerializerCache serializerCache, boolean writeSchemaAndSpec) { + this.serializerCache = serializerCache; + this.writeSchemaAndSpec = writeSchemaAndSpec; + } + + @Override + public TypeSerializer duplicate() { + return new DynamicRecordInternalSerializer( + new TableSerializerCache(serializerCache.catalogLoader(), serializerCache.maximumSize()), + writeSchemaAndSpec); + } + + @Override + public DynamicRecordInternal createInstance() { + return new DynamicRecordInternal(); + } + + @Override + public void serialize(DynamicRecordInternal toSerialize, DataOutputView dataOutputView) + throws IOException { + dataOutputView.writeUTF(toSerialize.tableName()); + dataOutputView.writeUTF(toSerialize.branch()); + if (writeSchemaAndSpec) { + dataOutputView.writeUTF(SchemaParser.toJson(toSerialize.schema())); + dataOutputView.writeUTF(PartitionSpecParser.toJson(toSerialize.spec())); + } else { + dataOutputView.writeInt(toSerialize.schema().schemaId()); + dataOutputView.writeInt(toSerialize.spec().specId()); + } + + dataOutputView.writeInt(toSerialize.writerKey()); + final RowDataSerializer rowDataSerializer; + if (writeSchemaAndSpec) { + rowDataSerializer = + serializerCache.serializer( + toSerialize.tableName(), toSerialize.schema(), toSerialize.spec()); + } else { + // Check that the schema id can be resolved. Not strictly necessary for serialization. + Tuple3 serializer = + serializerCache.serializerWithSchemaAndSpec( + toSerialize.tableName(), + toSerialize.schema().schemaId(), + toSerialize.spec().specId()); + rowDataSerializer = serializer.f0; + } + + rowDataSerializer.serialize(toSerialize.rowData(), dataOutputView); + dataOutputView.writeBoolean(toSerialize.upsertMode()); + dataOutputView.writeInt(toSerialize.equalityFields().size()); + for (Integer equalityField : toSerialize.equalityFields()) { + dataOutputView.writeInt(equalityField); + } + } + + @Override + public DynamicRecordInternal deserialize(DataInputView dataInputView) throws IOException { + String tableName = dataInputView.readUTF(); + String branch = dataInputView.readUTF(); + + final Schema schema; + final PartitionSpec spec; + final RowDataSerializer rowDataSerializer; + if (writeSchemaAndSpec) { + schema = SchemaParser.fromJson(dataInputView.readUTF()); + spec = PartitionSpecParser.fromJson(schema, dataInputView.readUTF()); + rowDataSerializer = serializerCache.serializer(tableName, schema, spec); + } else { + Integer schemaId = dataInputView.readInt(); + Integer specId = dataInputView.readInt(); + Tuple3 serializerWithSchemaAndSpec = + serializerCache.serializerWithSchemaAndSpec(tableName, schemaId, specId); + schema = serializerWithSchemaAndSpec.f1; + spec = serializerWithSchemaAndSpec.f2; + rowDataSerializer = serializerWithSchemaAndSpec.f0; + } + + int writerKey = dataInputView.readInt(); + RowData rowData = rowDataSerializer.deserialize(dataInputView); + boolean upsertMode = dataInputView.readBoolean(); + int numEqualityFields = dataInputView.readInt(); + final List equalityFieldIds; + if (numEqualityFields > 0) { + equalityFieldIds = Lists.newArrayList(); + } else { + equalityFieldIds = Collections.emptyList(); + } + + for (int i = 0; i < numEqualityFields; i++) { + equalityFieldIds.add(dataInputView.readInt()); + } + + return new DynamicRecordInternal( + tableName, branch, schema, rowData, spec, writerKey, upsertMode, equalityFieldIds); + } + + @Override + public DynamicRecordInternal deserialize(DynamicRecordInternal reuse, DataInputView dataInputView) + throws IOException { + String tableName = dataInputView.readUTF(); + reuse.setTableName(tableName); + String branch = dataInputView.readUTF(); + reuse.setBranch(branch); + + final Schema schema; + final PartitionSpec spec; + final RowDataSerializer rowDataSerializer; + if (writeSchemaAndSpec) { + schema = SchemaParser.fromJson(dataInputView.readUTF()); + spec = PartitionSpecParser.fromJson(schema, dataInputView.readUTF()); + reuse.setSchema(schema); + reuse.setSpec(spec); + rowDataSerializer = serializerCache.serializer(tableName, schema, spec); + } else { + Integer schemaId = dataInputView.readInt(); + Integer specId = dataInputView.readInt(); + Tuple3 serializerWithSchemaAndSpec = + serializerCache.serializerWithSchemaAndSpec(tableName, schemaId, specId); + schema = serializerWithSchemaAndSpec.f1; + spec = serializerWithSchemaAndSpec.f2; + rowDataSerializer = serializerWithSchemaAndSpec.f0; + } + + int writerKey = dataInputView.readInt(); + reuse.setWriterKey(writerKey); + RowData rowData = rowDataSerializer.deserialize(dataInputView); + boolean upsertMode = dataInputView.readBoolean(); + int numEqualityFields = dataInputView.readInt(); + final List equalityFieldIds; + if (numEqualityFields > 0) { + equalityFieldIds = Lists.newArrayList(); + } else { + equalityFieldIds = Collections.emptyList(); + } + for (int i = 0; i < numEqualityFields; i++) { + equalityFieldIds.add(dataInputView.readInt()); + } + return new DynamicRecordInternal( + tableName, branch, schema, rowData, spec, writerKey, upsertMode, equalityFieldIds); + } + + @Override + public DynamicRecordInternal copy(DynamicRecordInternal from) { + return new DynamicRecordInternal( + from.tableName(), + from.branch(), + from.schema(), + from.rowData(), + from.spec(), + from.writerKey(), + from.upsertMode(), + from.equalityFields()); + } + + @Override + public DynamicRecordInternal copy(DynamicRecordInternal from, DynamicRecordInternal reuse) { + reuse.setTableName(from.tableName()); + reuse.setBranch(from.branch()); + reuse.setSchema(from.schema()); + reuse.setSpec(from.spec()); + reuse.setWriterKey(from.writerKey()); + reuse.setRowData(from.rowData()); + reuse.setUpsertMode(from.upsertMode()); + reuse.setEqualityFieldIds(from.equalityFields()); + return reuse; + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + serialize(deserialize(source), target); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj instanceof DynamicRecordInternalSerializer) { + DynamicRecordInternalSerializer other = (DynamicRecordInternalSerializer) obj; + return writeSchemaAndSpec == other.writeSchemaAndSpec; + } + return false; + } + + @Override + public int hashCode() { + return Objects.hashCode(writeSchemaAndSpec); + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public int getLength() { + return -1; + } + + @Override + public TypeSerializerSnapshot snapshotConfiguration() { + return new DynamicRecordInternalTypeSerializerSnapshot(writeSchemaAndSpec); + } + + public static class DynamicRecordInternalTypeSerializerSnapshot + implements TypeSerializerSnapshot { + + private boolean writeSchemaAndSpec; + + // Zero args constructor is required to instantiate this class on restore + @SuppressWarnings({"unused", "checkstyle:RedundantModifier"}) + public DynamicRecordInternalTypeSerializerSnapshot() {} + + DynamicRecordInternalTypeSerializerSnapshot(boolean writeSchemaAndSpec) { + this.writeSchemaAndSpec = writeSchemaAndSpec; + } + + @Override + public int getCurrentVersion() { + return 0; + } + + @Override + public void writeSnapshot(DataOutputView out) throws IOException { + out.writeBoolean(writeSchemaAndSpec); + } + + @Override + public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) + throws IOException { + this.writeSchemaAndSpec = in.readBoolean(); + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility( + TypeSerializerSnapshot oldSerializerSnapshot) { + return TypeSerializerSchemaCompatibility.compatibleAsIs(); + } + + @Override + public TypeSerializer restoreSerializer() { + // Note: We pass in a null serializer cache which would create issues if we tried to use this + // restored serializer, but since we are using {@code + // TypeSerializerSchemaCompatibility.compatibleAsIs()} above, this serializer will never be + // used. A new one will be created via {@code DynamicRecordInternalType}. + return new DynamicRecordInternalSerializer(null, writeSchemaAndSpec); + } + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalType.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalType.java new file mode 100644 index 000000000000..c18c8f670daf --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalType.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.SerializerConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.iceberg.flink.CatalogLoader; + +@Internal +class DynamicRecordInternalType extends TypeInformation { + + private final CatalogLoader catalogLoader; + private final boolean writeSchemaAndSpec; + private final int cacheSize; + + DynamicRecordInternalType( + CatalogLoader catalogLoader, boolean writeSchemaAndSpec, int cacheSize) { + this.catalogLoader = catalogLoader; + this.writeSchemaAndSpec = writeSchemaAndSpec; + this.cacheSize = cacheSize; + } + + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 0; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return DynamicRecordInternal.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer(SerializerConfig serializerConfig) { + return new DynamicRecordInternalSerializer( + new TableSerializerCache(catalogLoader, cacheSize), writeSchemaAndSpec); + } + + @Override + public String toString() { + return getClass().getName(); + } + + @Override + public boolean equals(Object o) { + return canEqual(o); + } + + @Override + public int hashCode() { + return getClass().getName().hashCode(); + } + + @Override + public boolean canEqual(Object o) { + return o instanceof DynamicRecordInternalType; + } +} diff --git a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java new file mode 100644 index 000000000000..6acf873a94dc --- /dev/null +++ b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/TableSerializerCache.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import java.io.Serializable; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +/** + * A Cache which holds Flink's {@link RowDataSerializer} for a given table name and schema. This + * avoids re-creating the serializer for a given table schema for every incoming record. + * + *

There is an additional optimization built into this class: Users do not have to supply the + * full schema / spec, but can also provide their id. This avoids transferring the schema / spec for + * every record. If the id is unknown, the schema / spec will be retrieved from the catalog. + * + *

Note that the caller must ensure that ids are only used for known schemas / specs. The id + * optimization must not be used in the update path. + */ +@Internal +class TableSerializerCache implements Serializable { + + private final CatalogLoader catalogLoader; + private final int maximumSize; + private transient Cache serializers; + + TableSerializerCache(CatalogLoader catalogLoader, int maximumSize) { + this.catalogLoader = catalogLoader; + this.maximumSize = maximumSize; + } + + RowDataSerializer serializer(String tableName, Schema schema, PartitionSpec spec) { + return serializer(tableName, schema, spec, null, null).f0; + } + + Tuple3 serializerWithSchemaAndSpec( + String tableName, Integer schemaId, Integer specId) { + return serializer(tableName, null, null, schemaId, specId); + } + + private Tuple3 serializer( + String tableName, + @Nullable Schema unknownSchema, + @Nullable PartitionSpec unknownSpec, + @Nullable Integer schemaId, + @Nullable Integer specId) { + Preconditions.checkState( + (unknownSchema == null && unknownSpec == null) ^ (schemaId == null && specId == null), + "Either the full schema/spec or their ids must be provided."); + + if (serializers == null) { + // We need to initialize the cache at the first time + this.serializers = Caffeine.newBuilder().maximumSize(maximumSize).build(); + } + + SerializerInfo info = serializers.get(tableName, SerializerInfo::new); + Schema schema = unknownSchema != null ? unknownSchema : info.schemas.get(schemaId); + PartitionSpec spec = unknownSpec != null ? unknownSpec : info.specs.get(specId); + + if (schema == null || spec == null) { + info.update(); + schema = info.schemas.get(schemaId); + spec = info.specs.get(specId); + } + + RowDataSerializer serializer = + info.serializers.computeIfAbsent( + schema, s -> new RowDataSerializer(FlinkSchemaUtil.convert(s))); + + return Tuple3.of(serializer, schema, spec); + } + + CatalogLoader catalogLoader() { + return catalogLoader; + } + + int maximumSize() { + return maximumSize; + } + + private class SerializerInfo { + private final String tableName; + private final Map serializers; + private Map schemas; + private Map specs; + + SerializerInfo(String tableName) { + this.tableName = tableName; + this.serializers = Maps.newHashMapWithExpectedSize(2); + this.schemas = Maps.newHashMapWithExpectedSize(1); + this.specs = Maps.newHashMapWithExpectedSize(0); + } + + private void update() { + Table table = catalogLoader.loadCatalog().loadTable(TableIdentifier.parse(tableName)); + schemas = table.schemas(); + specs = table.specs(); + } + } + + @VisibleForTesting + Cache getCache() { + return serializers; + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java new file mode 100644 index 000000000000..243b7f90959b --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecordInternalSerializerTestBase.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.types.Types.NestedField.required; + +import java.util.Collections; +import org.apache.flink.api.common.typeutils.SerializerTestBase; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.StringData; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +/** + * Test base for DynamicRecordInternalSerializer which allows to instantiate different serializer + * version, e.g. with writing the schema itself or just the schema id. + */ +abstract class DynamicRecordInternalSerializerTestBase + extends SerializerTestBase { + + static final String TABLE = "myTable"; + static final String BRANCH = "myBranch"; + + @RegisterExtension + static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", TABLE); + + static final Schema SCHEMA = + new Schema( + required(1, "id", Types.LongType.get()), + required(2, "data", Types.StringType.get()), + required(3, "number", Types.FloatType.get())); + + static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA).bucket("id", 10).build(); + + private boolean writeFullSchemaAndSpec; + + DynamicRecordInternalSerializerTestBase(boolean writeFullSchemaAndSpec) { + this.writeFullSchemaAndSpec = writeFullSchemaAndSpec; + } + + @Override + protected TypeSerializer createSerializer() { + return new DynamicRecordInternalSerializer( + new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1), writeFullSchemaAndSpec); + } + + @BeforeEach + void before() { + CATALOG_EXTENSION.catalog().createTable(TableIdentifier.parse(TABLE), SCHEMA, SPEC); + } + + @Override + protected DynamicRecordInternal[] getTestData() { + GenericRowData rowData = new GenericRowData(3); + rowData.setField(0, 123L); + rowData.setField(1, StringData.fromString("test")); + rowData.setField(2, 1.23f); + + return new DynamicRecordInternal[] { + new DynamicRecordInternal( + TABLE, BRANCH, SCHEMA, rowData, SPEC, 42, false, Collections.emptyList()) + }; + } + + @Override + protected Class getTypeClass() { + return DynamicRecordInternal.class; + } + + @Override + protected int getLength() { + return -1; + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java new file mode 100644 index 000000000000..ab8ce98c3594 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchema.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +/** Test writing DynamicRecord with the full schema */ +class TestDynamicRecordInternalSerializerWriteSchema + extends DynamicRecordInternalSerializerTestBase { + + TestDynamicRecordInternalSerializerWriteSchema() { + super(true); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java new file mode 100644 index 000000000000..1d8890546214 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestDynamicRecordInternalSerializerWriteSchemaId.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +/** Test writing DynamicRecord with only the schema id. */ +class TestDynamicRecordInternalSerializerWriteSchemaId + extends DynamicRecordInternalSerializerTestBase { + + TestDynamicRecordInternalSerializerWriteSchemaId() { + super(false); + } +} diff --git a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java new file mode 100644 index 000000000000..720a0a582e94 --- /dev/null +++ b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestTableSerializerCache.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.sink.dynamic; + +import static org.apache.iceberg.types.Types.DoubleType; +import static org.apache.iceberg.types.Types.LongType; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.apache.iceberg.types.Types.StringType; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.function.Supplier; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.table.runtime.typeutils.RowDataSerializer; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.flink.CatalogLoader; +import org.apache.iceberg.flink.HadoopCatalogExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class TestTableSerializerCache { + + @RegisterExtension + static final HadoopCatalogExtension CATALOG_EXTENSION = new HadoopCatalogExtension("db", "table"); + + Schema schema1 = new Schema(23, required(1, "id", LongType.get())); + + Schema schema2 = + new Schema( + 42, + required(1, "id", LongType.get()), + optional(2, "data", StringType.get()), + optional(3, "double", DoubleType.get())); + + TableSerializerCache cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 10); + + @Test + void testFullSchemaCaching() { + Supplier creator1a = + () -> cache.serializer("table", schema1, PartitionSpec.unpartitioned()); + Supplier creator1b = + () -> cache.serializer("table", schema2, PartitionSpec.unpartitioned()); + Supplier creator2 = + () -> cache.serializer("table2", schema2, PartitionSpec.unpartitioned()); + + RowDataSerializer serializer1a = creator1a.get(); + RowDataSerializer serializer1b = creator1b.get(); + RowDataSerializer serializer2 = creator2.get(); + assertThat(serializer1a).isNotSameAs(serializer1b).isNotSameAs(serializer2); + + assertThat(serializer1a).isSameAs(creator1a.get()); + assertThat(serializer1b).isSameAs(creator1b.get()); + assertThat(serializer2).isSameAs(creator2.get()); + } + + @Test + void testCachingWithSchemaLookup() { + CatalogLoader catalogLoader = CATALOG_EXTENSION.catalogLoader(); + cache = new TableSerializerCache(catalogLoader, 10); + + Catalog catalog = catalogLoader.loadCatalog(); + Table table = catalog.createTable(TableIdentifier.of("table"), schema1); + + Tuple3 serializerWithSchemaAndSpec = + cache.serializerWithSchemaAndSpec( + "table", table.schema().schemaId(), PartitionSpec.unpartitioned().specId()); + assertThat(serializerWithSchemaAndSpec).isNotNull(); + assertThat(serializerWithSchemaAndSpec.f0).isNotNull(); + assertThat(serializerWithSchemaAndSpec.f1.sameSchema(table.schema())).isTrue(); + assertThat(serializerWithSchemaAndSpec.f2).isEqualTo(table.spec()); + + Tuple3 serializerWithSchemaAndSpec2 = + cache.serializerWithSchemaAndSpec( + "table", table.schema().schemaId(), PartitionSpec.unpartitioned().specId()); + + assertThat(serializerWithSchemaAndSpec.f0).isSameAs(serializerWithSchemaAndSpec2.f0); + assertThat(serializerWithSchemaAndSpec.f1).isSameAs(serializerWithSchemaAndSpec2.f1); + assertThat(serializerWithSchemaAndSpec.f2).isSameAs(serializerWithSchemaAndSpec2.f2); + } + + @Test + void testCacheEviction() { + cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 0); + assertThat(cache.maximumSize()).isEqualTo(0); + + Supplier creator1 = + () -> cache.serializer("table", schema1, PartitionSpec.unpartitioned()); + Supplier creator2 = + () -> cache.serializer("table2", schema2, PartitionSpec.unpartitioned()); + + RowDataSerializer serializer1 = creator1.get(); + RowDataSerializer serializer2 = creator2.get(); + + cache.getCache().cleanUp(); + assertThat(serializer1).isNotSameAs(creator1.get()); + assertThat(serializer2).isNotSameAs(creator2.get()); + } + + @Test + void testCacheSize() { + cache = new TableSerializerCache(CATALOG_EXTENSION.catalogLoader(), 1000); + assertThat(cache.maximumSize()).isEqualTo(1000); + } +}