Skip to content

Commit

Permalink
1
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Sep 19, 2024
1 parent 2c45ac0 commit bba79fa
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 137 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.table;

import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/** System fields. */
public class SystemFields {

public static final int SYSTEM_FIELD_ID_START = Integer.MAX_VALUE / 2;

public static final String KEY_FIELD_PREFIX = "_KEY_";
public static final int KEY_FIELD_ID_START = SYSTEM_FIELD_ID_START;

public static final DataField SEQUENCE_NUMBER =
new DataField(Integer.MAX_VALUE - 1, "_SEQUENCE_NUMBER", DataTypes.BIGINT().notNull());

public static final DataField VALUE_KIND =
new DataField(Integer.MAX_VALUE - 2, "_VALUE_KIND", DataTypes.TINYINT().notNull());

public static final DataField LEVEL =
new DataField(Integer.MAX_VALUE - 3, "_LEVEL", DataTypes.INT().notNull());

public static final Set<String> SYSTEM_FIELD_NAMES =
Stream.of(SEQUENCE_NUMBER.name(), VALUE_KIND.name(), LEVEL.name())
.collect(Collectors.toSet());

public static boolean isSystemField(int fieldId) {
return fieldId >= SYSTEM_FIELD_ID_START;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.table.SystemFields;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

Expand Down Expand Up @@ -256,7 +257,10 @@ public static RowType of(DataType[] types, String[] names) {
public static int currentHighestFieldId(List<DataField> fields) {
Set<Integer> fieldIds = new HashSet<>();
new RowType(fields).collectFieldIds(fieldIds);
return fieldIds.stream().max(Integer::compareTo).orElse(-1);
return fieldIds.stream()
.filter(i -> !SystemFields.isSystemField(i))
.max(Integer::compareTo)
.orElse(-1);
}

public static Builder builder() {
Expand Down
69 changes: 12 additions & 57 deletions paimon-core/src/main/java/org/apache/paimon/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,19 @@
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.InternalRowUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.paimon.schema.SystemColumns.LEVEL;
import static org.apache.paimon.schema.SystemColumns.SEQUENCE_NUMBER;
import static org.apache.paimon.schema.SystemColumns.VALUE_KIND;
import static org.apache.paimon.utils.Preconditions.checkState;
import static org.apache.paimon.table.SystemFields.LEVEL;
import static org.apache.paimon.table.SystemFields.SEQUENCE_NUMBER;
import static org.apache.paimon.table.SystemFields.VALUE_KIND;

/**
* A key value, including user key, sequence number, value kind and value. This object can be
Expand Down Expand Up @@ -115,70 +111,29 @@ public KeyValue setLevel(int level) {
}

public static RowType schema(RowType keyType, RowType valueType) {
List<DataField> fields = new ArrayList<>(keyType.getFields());
fields.add(new DataField(0, SEQUENCE_NUMBER, new BigIntType(false)));
fields.add(new DataField(1, VALUE_KIND, new TinyIntType(false)));
fields.addAll(valueType.getFields());
return new RowType(fields);
return new RowType(createKeyValueFields(keyType.getFields(), valueType.getFields()));
}

public static RowType schemaWithLevel(RowType keyType, RowType valueType) {
RowType.Builder builder = RowType.builder();
schema(keyType, valueType)
.getFields()
.forEach(f -> builder.field(f.name(), f.type(), f.description()));
builder.field(LEVEL, DataTypes.INT().notNull());
return builder.build();
List<DataField> fields = new ArrayList<>(schema(keyType, valueType).getFields());
fields.add(LEVEL);
return new RowType(fields);
}

/**
* Create key-value fields, we need to add a const value to the id of value field to ensure that
* they are consistent when compared by field id. For example, there are two table with key
* value fields as follows
*
* <ul>
* <li>Table1 key fields: 1->a, 2->b, 3->c; value fields: 0->value_count
* <li>Table2 key fields: 1->c, 3->d, 4->a, 5->b; value fields: 0->value_count
* </ul>
*
* <p>We will use 5 as maxKeyId, and create fields for Table1/Table2 as follows
*
* <ul>
* <li>Table1 fields: 1->a, 2->b, 3->c, 6->seq, 7->kind, 8->value_count
* <li>Table2 fields: 1->c, 3->d, 4->a, 5->b, 6->seq, 7->kind, 8->value_count
* </ul>
*
* <p>Then we can compare these two table fields with the field id.
* Create key-value fields.
*
* @param keyFields the key fields
* @param valueFields the value fields
* @param maxKeyId the max key id
* @return the table fields
*/
public static List<DataField> createKeyValueFields(
List<DataField> keyFields, List<DataField> valueFields, final int maxKeyId) {
checkState(maxKeyId >= keyFields.stream().mapToInt(DataField::id).max().orElse(0));

List<DataField> keyFields, List<DataField> valueFields) {
List<DataField> fields = new ArrayList<>(keyFields.size() + valueFields.size() + 2);
fields.addAll(keyFields);
fields.add(
new DataField(
maxKeyId + 1,
SEQUENCE_NUMBER,
new org.apache.paimon.types.BigIntType(false)));
fields.add(
new DataField(
maxKeyId + 2, VALUE_KIND, new org.apache.paimon.types.TinyIntType(false)));
for (DataField valueField : valueFields) {
DataField newValueField =
new DataField(
valueField.id() + maxKeyId + 3,
valueField.name(),
valueField.type(),
valueField.description());
fields.add(newValueField);
}

fields.add(SEQUENCE_NUMBER);
fields.add(VALUE_KIND);
fields.addAll(valueFields);
return fields;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,14 +217,9 @@ public static IndexCastMapping createIndexCastMapping(
int[] dataProjection,
List<DataField> dataKeyFields,
List<DataField> dataValueFields) {
int maxKeyId =
Math.max(
tableKeyFields.stream().mapToInt(DataField::id).max().orElse(0),
dataKeyFields.stream().mapToInt(DataField::id).max().orElse(0));
List<DataField> tableFields =
KeyValue.createKeyValueFields(tableKeyFields, tableValueFields, maxKeyId);
List<DataField> dataFields =
KeyValue.createKeyValueFields(dataKeyFields, dataValueFields, maxKeyId);
KeyValue.createKeyValueFields(tableKeyFields, tableValueFields);
List<DataField> dataFields = KeyValue.createKeyValueFields(dataKeyFields, dataValueFields);
return createIndexCastMapping(tableProjection, tableFields, dataProjection, dataFields);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
import static org.apache.paimon.types.DataTypeRoot.MAP;
import static org.apache.paimon.types.DataTypeRoot.MULTISET;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SystemFields.KEY_FIELD_ID_START;
import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;

/** Utils for creating changelog table with primary keys. */
public class PrimaryKeyTableUtils {
Expand All @@ -45,7 +46,7 @@ public static RowType addKeyNamePrefix(RowType type) {

public static List<DataField> addKeyNamePrefix(List<DataField> keyFields) {
return keyFields.stream()
.map(f -> f.newName(KEY_FIELD_PREFIX + f.name()))
.map(f -> f.newName(KEY_FIELD_PREFIX + f.name()).newId(f.id() + KEY_FIELD_ID_START))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private void recreateMergeTree(long targetFileSize) {
options.get(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER) + 1);
this.options = new CoreOptions(options);
RowType keyType = new RowType(singletonList(new DataField(0, "k", new IntType())));
RowType valueType = new RowType(singletonList(new DataField(0, "v", new IntType())));
RowType valueType = new RowType(singletonList(new DataField(1, "v", new IntType())));

String identifier = "avro";
FileFormat flushingAvro = new FlushingFileFormat(identifier);
Expand All @@ -161,20 +161,12 @@ private void recreateMergeTree(long targetFileSize) {
new KeyValueFieldsExtractor() {
@Override
public List<DataField> keyFields(TableSchema schema) {
return Collections.singletonList(
new DataField(
0,
"k",
new org.apache.paimon.types.IntType(false)));
return keyType.getFields();
}

@Override
public List<DataField> valueFields(TableSchema schema) {
return Collections.singletonList(
new DataField(
0,
"v",
new org.apache.paimon.types.IntType(false)));
return valueType.getFields();
}
},
new CoreOptions(new HashMap<>()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public List<DataField> keyFields(TableSchema schema) {
public List<DataField> valueFields(TableSchema schema) {
return Collections.singletonList(
new DataField(
0,
3,
"count",
new org.apache.paimon.types.BigIntType()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import java.util.UUID;
import java.util.function.Consumer;

import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
import static org.apache.paimon.schema.SystemColumns.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.table.SystemFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SystemFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -424,23 +424,6 @@ public void testDropAllFields() throws Exception {

@Test
public void testCreateAlterSystemField() throws Exception {
Schema schema1 =
new Schema(
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
new String[] {"f0", "_VALUE_COUNT"})
.getFields(),
Collections.emptyList(),
Collections.emptyList(),
new HashMap<>(),
"");
assertThatThrownBy(() -> schemaManager.createTable(schema1))
.isInstanceOf(IllegalStateException.class)
.hasMessage(
String.format(
"Field name[%s] in schema cannot be exist in %s",
"_VALUE_COUNT", SYSTEM_FIELD_NAMES));

Schema schema2 =
new Schema(
RowType.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@

import java.util.List;

import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;

/**
* {@link AbstractStreamOperator} which buffer input record and apply merge function when the buffer
* is full. Mainly to resolve data skew on primary keys.
Expand Down Expand Up @@ -105,7 +107,7 @@ public void open() throws Exception {
// partition fields.
@Override
public List<DataField> keyFields(TableSchema schema) {
return schema.primaryKeysFields();
return addKeyNamePrefix(schema.primaryKeysFields());
}

@Override
Expand Down

0 comments on commit bba79fa

Please sign in to comment.