Skip to content

Commit

Permalink
[Feature] Support multiple fields in partial-update sequence-group
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Jun 4, 2024
1 parent 605bad0 commit 4622f12
Show file tree
Hide file tree
Showing 7 changed files with 647 additions and 262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public class CoreOptions implements Serializable {

public static final String FIELDS_PREFIX = "fields";

public static final String FIELDS_SEPARATOR = ",";

public static final String AGG_FUNCTION = "aggregate-function";
public static final String DEFAULT_AGG_FUNCTION = "default-aggregate-function";

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -51,6 +52,7 @@
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.DEFAULT_AGG_FUNCTION;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
Expand Down Expand Up @@ -349,13 +351,15 @@ private static void validateFieldsPrefix(TableSchema schema, CoreOptions options
.forEach(
k -> {
if (k.startsWith(FIELDS_PREFIX)) {
String fieldName = k.split("\\.")[1];
checkArgument(
DEFAULT_AGG_FUNCTION.equals(fieldName)
|| fieldNames.contains(fieldName),
String.format(
"Field %s can not be found in table schema.",
fieldName));
String[] fields = k.split("\\.")[1].split(FIELDS_SEPARATOR);
for (String field : fields) {
checkArgument(
DEFAULT_AGG_FUNCTION.equals(field)
|| fieldNames.contains(field),
String.format(
"Field %s can not be found in table schema.",
field));
}
}
});
}
Expand All @@ -367,29 +371,42 @@ private static void validateSequenceGroup(TableSchema schema, CoreOptions option
String v = entry.getValue();
List<String> fieldNames = schema.fieldNames();
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
String sequenceFieldName =
String[] sequenceFieldNames =
k.substring(
FIELDS_PREFIX.length() + 1,
k.length() - SEQUENCE_GROUP.length() - 1);
if (!fieldNames.contains(sequenceFieldName)) {
throw new IllegalArgumentException(
String.format(
"The sequence field group: %s can not be found in table schema.",
sequenceFieldName));
}
FIELDS_PREFIX.length() + 1,
k.length() - SEQUENCE_GROUP.length() - 1)
.split(FIELDS_SEPARATOR);

for (String field : v.split(",")) {
for (String field : v.split(FIELDS_SEPARATOR)) {
if (!fieldNames.contains(field)) {
throw new IllegalArgumentException(
String.format("Field %s can not be found in table schema.", field));
}
Set<String> group = fields2Group.computeIfAbsent(field, p -> new HashSet<>());
if (group.add(sequenceFieldName) && group.size() > 1) {

List<String> sequenceFieldsList = new ArrayList<>();
for (String sequenceFieldName : sequenceFieldNames) {
if (!fieldNames.contains(sequenceFieldName)) {
throw new IllegalArgumentException(
String.format(
"The sequence field group: %s can not be found in table schema.",
sequenceFieldName));
}
sequenceFieldsList.add(sequenceFieldName);
}

if (fields2Group.containsKey(field)) {
List<List<String>> sequenceGroups = new ArrayList<>();
sequenceGroups.add(new ArrayList<>(fields2Group.get(field)));
sequenceGroups.add(sequenceFieldsList);

throw new IllegalArgumentException(
String.format(
"Field %s is defined repeatedly by multiple groups: %s.",
field, group));
field, sequenceGroups));
}

Set<String> group = fields2Group.computeIfAbsent(field, p -> new HashSet<>());
group.addAll(sequenceFieldsList);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,18 @@ public static UserDefinedSeqComparator create(RowType rowType, List<String> sequ

List<String> fieldNames = rowType.getFieldNames();
int[] fields = sequenceFields.stream().mapToInt(fieldNames::indexOf).toArray();

return create(rowType, fields);
}

@Nullable
public static UserDefinedSeqComparator create(RowType rowType, int[] sequenceFields) {
if (sequenceFields.length == 0) {
return null;
}

RecordComparator comparator =
CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), fields);
return new UserDefinedSeqComparator(fields, comparator);
CodeGenUtils.newRecordComparator(rowType.getFieldTypes(), sequenceFields);
return new UserDefinedSeqComparator(sequenceFields, comparator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
protected MergeFunction<KeyValue> createMergeFunction() {
Options options = new Options();
return PartialUpdateMergeFunction.factory(
options, RowType.of(DataTypes.BIGINT()), ImmutableList.of("key"))
options, RowType.of(DataTypes.BIGINT()), ImmutableList.of("f0"))
.create();
}
}
Expand Down
Loading

0 comments on commit 4622f12

Please sign in to comment.