Skip to content

Commit

Permalink
[core] Fix that cannot read UPDATE_BEFORE of ignore-delete table (apa…
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzelin authored Jun 4, 2024
1 parent dd19b26 commit 605bad0
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,12 @@ public class KeyValueDataFileRecordReader implements RecordReader<KeyValue> {
private final RecordReader<InternalRow> reader;
private final KeyValueSerializer serializer;
private final int level;
private final boolean ignoreDelete;

public KeyValueDataFileRecordReader(
RecordReader<InternalRow> reader,
RowType keyType,
RowType valueType,
int level,
boolean ignoreDelete) {
RecordReader<InternalRow> reader, RowType keyType, RowType valueType, int level) {
this.reader = reader;
this.serializer = new KeyValueSerializer(keyType, valueType);
this.level = level;
this.ignoreDelete = ignoreDelete;
}

@Nullable
Expand All @@ -56,15 +50,11 @@ public RecordIterator<KeyValue> readBatch() throws IOException {
return null;
}

RecordIterator<KeyValue> transformed =
iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
// In 0.7- versions, the delete records might be written into data file even when
// ignore-delete configured, so the reader should also filter the delete records
return ignoreDelete ? transformed.filter(KeyValue::isAdd) : transformed;
return iterator.transform(
internalRow ->
internalRow == null
? null
: serializer.fromRow(internalRow).setLevel(level));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public class KeyValueFileReaderFactory implements FileReaderFactory<KeyValue> {
private final Map<FormatKey, BulkFormatMapping> bulkFormatMappings;
private final BinaryRow partition;
private final DeletionVector.Factory dvFactory;
private final boolean ignoreDelete;

private KeyValueFileReaderFactory(
FileIO fileIO,
Expand All @@ -92,7 +91,6 @@ private KeyValueFileReaderFactory(
this.partition = partition;
this.bulkFormatMappings = new HashMap<>();
this.dvFactory = dvFactory;
this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete();
}

@Override
Expand Down Expand Up @@ -151,8 +149,7 @@ private RecordReader<KeyValue> createRecordReader(
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(
fileRecordReader, keyType, valueType, level, ignoreDelete);
return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.options.Options;

import javax.annotation.Nullable;

Expand All @@ -28,15 +30,26 @@
*/
public class DeduplicateMergeFunction implements MergeFunction<KeyValue> {

private final boolean ignoreDelete;

private KeyValue latestKv;

private DeduplicateMergeFunction(boolean ignoreDelete) {
this.ignoreDelete = ignoreDelete;
}

@Override
public void reset() {
latestKv = null;
}

@Override
public void add(KeyValue kv) {
// In 0.7- versions, the delete records might be written into data file even when
// ignore-delete configured, so ignoreDelete still needs to be checked
if (ignoreDelete && kv.valueKind().isRetract()) {
return;
}
latestKv = kv;
}

Expand All @@ -46,16 +59,26 @@ public KeyValue getResult() {
}

public static MergeFunctionFactory<KeyValue> factory() {
return new Factory();
return new Factory(false);
}

public static MergeFunctionFactory<KeyValue> factory(Options options) {
return new Factory(options.get(CoreOptions.IGNORE_DELETE));
}

private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;

private final boolean ignoreDelete;

private Factory(boolean ignoreDelete) {
this.ignoreDelete = ignoreDelete;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new DeduplicateMergeFunction();
return new DeduplicateMergeFunction(ignoreDelete);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@

package org.apache.paimon.mergetree.compact;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;

import javax.annotation.Nullable;

Expand All @@ -35,10 +36,12 @@ public class FirstRowMergeFunction implements MergeFunction<KeyValue> {
private final InternalRowSerializer valueSerializer;
private KeyValue first;
public boolean containsHighLevel;
private final boolean ignoreDelete;

protected FirstRowMergeFunction(RowType keyType, RowType valueType) {
protected FirstRowMergeFunction(RowType keyType, RowType valueType, boolean ignoreDelete) {
this.keySerializer = new InternalRowSerializer(keyType);
this.valueSerializer = new InternalRowSerializer(valueType);
this.ignoreDelete = ignoreDelete;
}

@Override
Expand All @@ -49,10 +52,18 @@ public void reset() {

@Override
public void add(KeyValue kv) {
Preconditions.checkArgument(
kv.valueKind().isAdd(),
"By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n"
+ "You can config 'ignore-delete' to ignore the DELETE/UPDATE_BEFORE records.");
if (kv.valueKind().isRetract()) {
// In 0.7- versions, the delete records might be written into data file even when
// ignore-delete configured, so ignoreDelete still needs to be checked
if (ignoreDelete) {
return;
} else {
throw new IllegalArgumentException(
"By default, First row merge engine can not accept DELETE/UPDATE_BEFORE records.\n"
+ "You can config 'first-row.ignore-delete' to ignore the DELETE/UPDATE_BEFORE records.");
}
}

if (first == null) {
this.first = kv.copy(keySerializer, valueSerializer);
}
Expand All @@ -66,24 +77,28 @@ public KeyValue getResult() {
return first;
}

public static MergeFunctionFactory<KeyValue> factory(RowType keyType, RowType valueType) {
return new FirstRowMergeFunction.Factory(keyType, valueType);
public static MergeFunctionFactory<KeyValue> factory(
Options options, RowType keyType, RowType valueType) {
return new FirstRowMergeFunction.Factory(
keyType, valueType, options.get(CoreOptions.IGNORE_DELETE));
}

private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;
private final RowType keyType;
private final RowType valueType;
private final boolean ignoreDelete;

public Factory(RowType keyType, RowType valueType) {
public Factory(RowType keyType, RowType valueType, boolean ignoreDelete) {
this.keyType = keyType;
this.valueType = valueType;
this.ignoreDelete = ignoreDelete;
}

@Override
public MergeFunction<KeyValue> create(@Nullable int[][] projection) {
return new FirstRowMergeFunction(keyType, valueType);
return new FirstRowMergeFunction(keyType, valueType, ignoreDelete);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {
public static final String SEQUENCE_GROUP = "sequence-group";

private final InternalRow.FieldGetter[] getters;
private final boolean ignoreDelete;
private final Map<Integer, SequenceGenerator> fieldSequences;
private final boolean fieldSequenceEnabled;
private final Map<Integer, FieldAggregator> fieldAggregators;
Expand All @@ -78,10 +79,12 @@ public class PartialUpdateMergeFunction implements MergeFunction<KeyValue> {

protected PartialUpdateMergeFunction(
InternalRow.FieldGetter[] getters,
boolean ignoreDelete,
Map<Integer, SequenceGenerator> fieldSequences,
Map<Integer, FieldAggregator> fieldAggregators,
boolean fieldSequenceEnabled) {
this.getters = getters;
this.ignoreDelete = ignoreDelete;
this.fieldSequences = fieldSequences;
this.fieldAggregators = fieldAggregators;
this.fieldSequenceEnabled = fieldSequenceEnabled;
Expand All @@ -100,6 +103,12 @@ public void add(KeyValue kv) {
currentKey = kv.key();

if (kv.valueKind().isRetract()) {
// In 0.7- versions, the delete records might be written into data file even when
// ignore-delete configured, so ignoreDelete still needs to be checked
if (ignoreDelete) {
return;
}

if (fieldSequenceEnabled) {
retractWithSequenceGroup(kv);
return;
Expand Down Expand Up @@ -216,12 +225,14 @@ private static class Factory implements MergeFunctionFactory<KeyValue> {

private static final long serialVersionUID = 1L;

private final boolean ignoreDelete;
private final List<DataType> tableTypes;
private final Map<Integer, SequenceGenerator> fieldSequences;

private final Map<Integer, FieldAggregator> fieldAggregators;

private Factory(Options options, RowType rowType, List<String> primaryKeys) {
this.ignoreDelete = options.get(CoreOptions.IGNORE_DELETE);
this.tableTypes = rowType.getFieldTypes();

List<String> fieldNames = rowType.getFieldNames();
Expand Down Expand Up @@ -307,12 +318,14 @@ public MergeFunction<KeyValue> create(@Nullable int[][] projection) {

return new PartialUpdateMergeFunction(
createFieldGetters(Projection.of(projection).project(tableTypes)),
ignoreDelete,
projectedSequences,
projectedAggregators,
!fieldSequences.isEmpty());
} else {
return new PartialUpdateMergeFunction(
createFieldGetters(tableTypes),
ignoreDelete,
fieldSequences,
fieldAggregators,
!fieldSequences.isEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(

switch (mergeEngine) {
case DEDUPLICATE:
return DeduplicateMergeFunction.factory();
return DeduplicateMergeFunction.factory(conf);
case PARTIAL_UPDATE:
return PartialUpdateMergeFunction.factory(conf, rowType, tableSchema.primaryKeys());
case AGGREGATE:
Expand All @@ -69,7 +69,7 @@ public static MergeFunctionFactory<KeyValue> createMergeFunctionFactory(
tableSchema.primaryKeys());
case FIRST_ROW:
return FirstRowMergeFunction.factory(
new RowType(extractor.keyFields(tableSchema)), rowType);
conf, new RowType(extractor.keyFields(tableSchema)), rowType);
default:
throw new UnsupportedOperationException("Unsupported merge engine: " + mergeEngine);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ protected List<ReusingTestData> getExpected(List<ReusingTestData> input) {
@Override
protected MergeFunction<KeyValue> createMergeFunction() {
return FirstRowMergeFunction.factory(
new Options(),
new RowType(Lists.list(new DataField(0, "f0", new IntType()))),
new RowType(Lists.list(new DataField(1, "f1", new BigIntType()))))
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ public void testFirstRow() {
new RowType(
Lists.list(new DataField(0, "f0", new IntType()))),
new RowType(
Lists.list(new DataField(1, "f1", new IntType())))),
Lists.list(new DataField(1, "f1", new IntType()))),
false),
highLevel::contains);

// Without level-0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ protected MergeFunction<KeyValue> createMergeFunction() {
RowType keyType = new RowType(Lists.list(new DataField(0, "f0", new IntType())));
RowType valueType = new RowType(Lists.list(new DataField(1, "f1", new BigIntType())));
return new LookupMergeFunction(
new FirstRowMergeFunction(keyType, valueType), keyType, valueType);
new FirstRowMergeFunction(keyType, valueType, false), keyType, valueType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,22 @@ public void testIgnoreDelete() {
assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B"));
}

@Test
public void testIgnoreDeleteCompatible() {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
+ "WITH ('merge-engine' = 'deduplicate', 'write-only' = 'true')");

sql("INSERT INTO ignore_delete VALUES (1, 'A')");
// write delete records
sql("DELETE FROM ignore_delete WHERE pk = 1");
assertThat(sql("SELECT * FROM ignore_delete")).isEmpty();

// set ignore-delete and read
sql("ALTER TABLE ignore_delete set ('ignore-delete' = 'true')");
assertThat(sql("SELECT * FROM ignore_delete")).containsExactlyInAnyOrder(Row.of(1, "A"));
}

@Test
public void testIgnoreDeleteWithRowKindField() {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,16 +553,23 @@ public void testDynamicPartitionPruningNotWork() throws Exception {
public void testIgnoreDelete() throws Exception {
sql(
"CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING) "
+ "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1')");
BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT * FROM ignore_delete");
+ "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true')");

BlockingIterator<Row, Row> iterator =
streamSqlBlockIter(
"SELECT * FROM ignore_delete /*+ OPTIONS('continuous.discovery-interval' = '1s') */");

sql("INSERT INTO ignore_delete VALUES (1, 'A'), (2, 'B')");
sql("DELETE FROM ignore_delete WHERE pk = 1");
sql("INSERT INTO ignore_delete VALUES (1, 'B')");

assertThat(iterator.collect(2))
.containsExactlyInAnyOrder(
Row.ofKind(RowKind.INSERT, 1, "B"), Row.ofKind(RowKind.INSERT, 2, "B"));
// no -D[1, 'A'] but exist -U[1, 'A']
assertThat(iterator.collect(4))
.containsExactly(
Row.ofKind(RowKind.INSERT, 1, "A"),
Row.ofKind(RowKind.INSERT, 2, "B"),
Row.ofKind(RowKind.UPDATE_BEFORE, 1, "A"),
Row.ofKind(RowKind.UPDATE_AFTER, 1, "B"));
iterator.close();
}

Expand Down
Loading

0 comments on commit 605bad0

Please sign in to comment.