Skip to content

Commit 8a539c8

Browse files
committed
Flink: Add table update code for schema comparison and evolution
This adds the classes around schema / spec comparison and evolution. A breakdown of the classes follows: # CompareSchemasVisitor Compares the user-provided schema against the current table schema. # EvolveSchemaVisitor Computes the changes required to the table schema to be compatible with the user-provided schema. # ParititonSpecEvolution Code for checking compatibility with the user-provided PartitionSpec and computing a set of changes to rewrite the PartitionSpec. # TableDataCache Cache which holds all relevant metadata of a table like its name, branch, schema, partition spec. Also holds a cache of past comparison results for a given table's schema and the user-provided input schema. # Table Updater Core logic to compare and create/update a table given a user-provided input schema. Broken out of apache#12424, depends on apache#12996.
1 parent 1804b0a commit 8a539c8

File tree

10 files changed

+2231
-0
lines changed

10 files changed

+2231
-0
lines changed
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.flink.sink.dynamic;
20+
21+
import java.util.List;
22+
import java.util.Map;
23+
import org.apache.iceberg.Schema;
24+
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
25+
import org.apache.iceberg.schema.SchemaWithPartnerVisitor;
26+
import org.apache.iceberg.types.Type;
27+
import org.apache.iceberg.types.Types;
28+
29+
/** Visitor class which compares two schemas and decides whether they are compatible. */
30+
public class CompareSchemasVisitor
31+
extends SchemaWithPartnerVisitor<Integer, CompareSchemasVisitor.Result> {
32+
33+
private final Schema tableSchema;
34+
35+
private CompareSchemasVisitor(Schema tableSchema) {
36+
this.tableSchema = tableSchema;
37+
}
38+
39+
public static Result visit(Schema dataSchema, Schema tableSchema) {
40+
return visit(dataSchema, tableSchema, true);
41+
}
42+
43+
public static Result visit(Schema dataSchema, Schema tableSchema, boolean caseSensitive) {
44+
return visit(
45+
dataSchema,
46+
-1,
47+
new CompareSchemasVisitor(tableSchema),
48+
new PartnerIdByNameAccessors(tableSchema, caseSensitive));
49+
}
50+
51+
@Override
52+
public Result schema(Schema dataSchema, Integer tableSchemaId, Result downstream) {
53+
if (tableSchemaId == null) {
54+
return Result.INCOMPATIBLE;
55+
}
56+
57+
return downstream;
58+
}
59+
60+
@Override
61+
public Result struct(Types.StructType struct, Integer tableSchemaId, List<Result> fields) {
62+
if (tableSchemaId == null) {
63+
return Result.INCOMPATIBLE;
64+
}
65+
66+
Result result = fields.stream().reduce(Result::merge).orElse(Result.INCOMPATIBLE);
67+
68+
if (result == Result.INCOMPATIBLE) {
69+
return Result.INCOMPATIBLE;
70+
}
71+
72+
Type tableSchemaType =
73+
tableSchemaId == -1 ? tableSchema.asStruct() : tableSchema.findField(tableSchemaId).type();
74+
if (!tableSchemaType.isStructType()) {
75+
return Result.INCOMPATIBLE;
76+
}
77+
78+
if (struct.fields().size() != tableSchemaType.asStructType().fields().size()) {
79+
return Result.CONVERSION_NEEDED;
80+
}
81+
82+
for (int i = 0; i < struct.fields().size(); ++i) {
83+
if (!struct
84+
.fields()
85+
.get(i)
86+
.name()
87+
.equals(tableSchemaType.asStructType().fields().get(i).name())) {
88+
return Result.CONVERSION_NEEDED;
89+
}
90+
}
91+
92+
return result;
93+
}
94+
95+
@Override
96+
public Result field(Types.NestedField field, Integer tableSchemaId, Result typeResult) {
97+
if (tableSchemaId == null) {
98+
return Result.INCOMPATIBLE;
99+
}
100+
101+
if (typeResult != Result.SAME) {
102+
return typeResult;
103+
}
104+
105+
if (tableSchema.findField(tableSchemaId).isRequired() && field.isOptional()) {
106+
return Result.INCOMPATIBLE;
107+
} else {
108+
return Result.SAME;
109+
}
110+
}
111+
112+
@Override
113+
public Result list(Types.ListType list, Integer tableSchemaId, Result elementsResult) {
114+
if (tableSchemaId == null) {
115+
return Result.INCOMPATIBLE;
116+
}
117+
118+
return elementsResult;
119+
}
120+
121+
@Override
122+
public Result map(
123+
Types.MapType map, Integer tableSchemaId, Result keyResult, Result valueResult) {
124+
if (tableSchemaId == null) {
125+
return Result.INCOMPATIBLE;
126+
}
127+
128+
return keyResult.merge(valueResult);
129+
}
130+
131+
@Override
132+
@SuppressWarnings("checkstyle:CyclomaticComplexity")
133+
public Result primitive(Type.PrimitiveType primitive, Integer tableSchemaId) {
134+
if (tableSchemaId == null) {
135+
return Result.INCOMPATIBLE;
136+
}
137+
138+
Type tableSchemaType = tableSchema.findField(tableSchemaId).type();
139+
if (!tableSchemaType.isPrimitiveType()) {
140+
return Result.INCOMPATIBLE;
141+
}
142+
143+
Type.PrimitiveType tableSchemaPrimitiveType = tableSchemaType.asPrimitiveType();
144+
if (primitive.equals(tableSchemaPrimitiveType)) {
145+
return Result.SAME;
146+
} else if (primitive.equals(Types.IntegerType.get())
147+
&& tableSchemaPrimitiveType.equals(Types.LongType.get())) {
148+
return Result.CONVERSION_NEEDED;
149+
} else if (primitive.equals(Types.FloatType.get())
150+
&& tableSchemaPrimitiveType.equals(Types.DoubleType.get())) {
151+
return Result.CONVERSION_NEEDED;
152+
} else if (primitive.equals(Types.DateType.get())
153+
&& tableSchemaPrimitiveType.equals(Types.TimestampType.withoutZone())) {
154+
return Result.CONVERSION_NEEDED;
155+
} else if (primitive.typeId() == Type.TypeID.DECIMAL
156+
&& tableSchemaPrimitiveType.typeId() == Type.TypeID.DECIMAL) {
157+
Types.DecimalType dataType = (Types.DecimalType) primitive;
158+
Types.DecimalType tableType = (Types.DecimalType) tableSchemaPrimitiveType;
159+
return dataType.scale() == tableType.scale() && dataType.precision() < tableType.precision()
160+
? Result.CONVERSION_NEEDED
161+
: Result.INCOMPATIBLE;
162+
} else {
163+
return Result.INCOMPATIBLE;
164+
}
165+
}
166+
167+
static class PartnerIdByNameAccessors implements PartnerAccessors<Integer> {
168+
private final Schema tableSchema;
169+
private boolean caseSensitive = true;
170+
171+
PartnerIdByNameAccessors(Schema tableSchema) {
172+
this.tableSchema = tableSchema;
173+
}
174+
175+
private PartnerIdByNameAccessors(Schema tableSchema, boolean caseSensitive) {
176+
this(tableSchema);
177+
this.caseSensitive = caseSensitive;
178+
}
179+
180+
@Override
181+
public Integer fieldPartner(Integer tableSchemaFieldId, int fieldId, String name) {
182+
Types.StructType struct;
183+
if (tableSchemaFieldId == -1) {
184+
struct = tableSchema.asStruct();
185+
} else {
186+
struct = tableSchema.findField(tableSchemaFieldId).type().asStructType();
187+
}
188+
189+
Types.NestedField field =
190+
caseSensitive ? struct.field(name) : struct.caseInsensitiveField(name);
191+
if (field != null) {
192+
return field.fieldId();
193+
}
194+
195+
return null;
196+
}
197+
198+
@Override
199+
public Integer mapKeyPartner(Integer tableSchemaMapId) {
200+
Types.NestedField mapField = tableSchema.findField(tableSchemaMapId);
201+
if (mapField != null) {
202+
return mapField.type().asMapType().fields().get(0).fieldId();
203+
}
204+
205+
return null;
206+
}
207+
208+
@Override
209+
public Integer mapValuePartner(Integer tableSchemaMapId) {
210+
Types.NestedField mapField = tableSchema.findField(tableSchemaMapId);
211+
if (mapField != null) {
212+
return mapField.type().asMapType().fields().get(1).fieldId();
213+
}
214+
215+
return null;
216+
}
217+
218+
@Override
219+
public Integer listElementPartner(Integer tableSchemaListId) {
220+
Types.NestedField listField = tableSchema.findField(tableSchemaListId);
221+
if (listField != null) {
222+
return listField.type().asListType().fields().get(0).fieldId();
223+
}
224+
225+
return null;
226+
}
227+
}
228+
229+
public enum Result {
230+
SAME(0),
231+
CONVERSION_NEEDED(1),
232+
INCOMPATIBLE(2);
233+
234+
private static final Map<Integer, Result> BY_ID = Maps.newHashMap();
235+
236+
static {
237+
for (Result e : Result.values()) {
238+
if (BY_ID.put(e.id, e) != null) {
239+
throw new IllegalArgumentException("Duplicate id: " + e.id);
240+
}
241+
}
242+
}
243+
244+
private final int id;
245+
246+
Result(int id) {
247+
this.id = id;
248+
}
249+
250+
private Result merge(Result other) {
251+
return BY_ID.get(Math.max(this.id, other.id));
252+
}
253+
}
254+
}

0 commit comments

Comments
 (0)