Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Refactor ChangelogMergeTreeRewriter to lookup free #2986

Merged
merged 2 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.MergeTreeReaders;
import org.apache.paimon.mergetree.SortedRun;
Expand All @@ -49,7 +47,8 @@ public abstract class ChangelogMergeTreeRewriter extends MergeTreeCompactRewrite

protected final int maxLevel;
protected final MergeEngine mergeEngine;
protected final LookupStrategy lookupStrategy;
private final boolean produceChangelog;
private final boolean forceDropDelete;

public ChangelogMergeTreeRewriter(
int maxLevel,
Expand All @@ -60,19 +59,19 @@ public ChangelogMergeTreeRewriter(
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
LookupStrategy lookupStrategy,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
boolean produceChangelog,
boolean forceDropDelete) {
super(
readerFactory,
writerFactory,
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter,
deletionVectorsMaintainer);
mergeSorter);
this.maxLevel = maxLevel;
this.mergeEngine = mergeEngine;
this.lookupStrategy = lookupStrategy;
this.produceChangelog = produceChangelog;
this.forceDropDelete = forceDropDelete;
}

protected abstract boolean rewriteChangelog(
Expand Down Expand Up @@ -143,17 +142,19 @@ private CompactResult rewriteChangelogCompaction(
if (rewriteCompactFile) {
compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel);
}
if (lookupStrategy.produceChangelog) {
if (produceChangelog) {
changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel);
}

while (iterator.hasNext()) {
ChangelogResult result = iterator.next();
KeyValue keyValue = result.result();
if (rewriteCompactFile && keyValue != null && (!dropDelete || keyValue.isAdd())) {
if (compactFileWriter != null
&& keyValue != null
&& (!dropDelete || keyValue.isAdd())) {
compactFileWriter.write(keyValue);
}
if (lookupStrategy.produceChangelog) {
if (produceChangelog) {
for (KeyValue kv : result.changelogs()) {
changelogFileWriter.write(kv);
}
Expand All @@ -173,24 +174,19 @@ private CompactResult rewriteChangelogCompaction(

List<DataFileMeta> before = extractFilesFromSections(sections);
List<DataFileMeta> after =
rewriteCompactFile
compactFileWriter != null
? compactFileWriter.result()
: before.stream()
.map(x -> x.upgrade(outputLevel))
.collect(Collectors.toList());

if (deletionVectorsMaintainer != null) {
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
for (DataFileMeta dataFileMeta : before) {
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
}
}
notifyCompactBefore(before);

return new CompactResult(
before,
after,
lookupStrategy.produceChangelog
List<DataFileMeta> changelogFiles =
changelogFileWriter != null
? changelogFileWriter.result()
: Collections.emptyList());
: Collections.emptyList();
return new CompactResult(before, after, changelogFiles);
}

@Override
Expand All @@ -201,8 +197,7 @@ public CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exceptio
outputLevel,
Collections.singletonList(
Collections.singletonList(SortedRun.fromSingle(file))),
// In deletion vector mode, we always drop deletion
lookupStrategy.deletionVector,
forceDropDelete,
strategy.rewrite);
} else {
return super.upgrade(outputLevel, file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.MergeSorter;
import org.apache.paimon.mergetree.SortedRun;
import org.apache.paimon.utils.FieldsComparator;
Expand Down Expand Up @@ -66,8 +65,8 @@ public FullChangelogMergeTreeCompactRewriter(
userDefinedSeqComparator,
mfFactory,
mergeSorter,
LookupStrategy.CHANGELOG_ONLY,
null);
true,
false);
this.valueEqualiser = valueEqualiser;
this.changelogRowDeduplicate = changelogRowDeduplicate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_NO_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.CHANGELOG_WITH_REWRITE;
import static org.apache.paimon.mergetree.compact.ChangelogMergeTreeRewriter.UpgradeStrategy.NO_CHANGELOG;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* A {@link MergeTreeCompactRewriter} which produces changelog files by lookup for the compaction
Expand All @@ -52,6 +51,7 @@ public class LookupMergeTreeCompactRewriter<T> extends ChangelogMergeTreeRewrite

private final LookupLevels<T> lookupLevels;
private final MergeFunctionWrapperFactory<T> wrapperFactory;
@Nullable private final DeletionVectorsMaintainer dvMaintainer;

public LookupMergeTreeCompactRewriter(
int maxLevel,
Expand All @@ -64,8 +64,8 @@ public LookupMergeTreeCompactRewriter(
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
MergeFunctionWrapperFactory<T> wrapperFactory,
LookupStrategy lookupStrategy,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
boolean produceChangelog,
@Nullable DeletionVectorsMaintainer dvMaintainer) {
super(
maxLevel,
mergeEngine,
Expand All @@ -75,17 +75,20 @@ public LookupMergeTreeCompactRewriter(
userDefinedSeqComparator,
mfFactory,
mergeSorter,
lookupStrategy,
deletionVectorsMaintainer);
if (lookupStrategy.deletionVector) {
checkArgument(
deletionVectorsMaintainer != null,
"deletionVectorsMaintainer should not be null, there is a bug.");
}
produceChangelog,
dvMaintainer != null);
this.dvMaintainer = dvMaintainer;
this.lookupLevels = lookupLevels;
this.wrapperFactory = wrapperFactory;
}

@Override
protected void notifyCompactBefore(List<DataFileMeta> files) {
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
if (dvMaintainer != null) {
files.forEach(file -> dvMaintainer.removeDeletionVectorOf(file.fileName()));
}
}

@Override
protected boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>> sections) {
Expand All @@ -99,7 +102,8 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {
}

// In deletionVector mode, since drop delete is required, rewrite is always required.
if (lookupStrategy.deletionVector) {
// TODO wait https://github.com/apache/incubator-paimon/pull/2962
if (dvMaintainer != null) {
return CHANGELOG_WITH_REWRITE;
}

Expand All @@ -121,8 +125,7 @@ protected UpgradeStrategy upgradeChangelog(int outputLevel, DataFileMeta file) {

@Override
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int outputLevel) {
return wrapperFactory.create(
mfFactory, outputLevel, lookupLevels, deletionVectorsMaintainer);
return wrapperFactory.create(mfFactory, outputLevel, lookupLevels, dvMaintainer);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.paimon.KeyValue;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
Expand All @@ -47,23 +46,20 @@ public class MergeTreeCompactRewriter extends AbstractCompactRewriter {
@Nullable protected final FieldsComparator userDefinedSeqComparator;
protected final MergeFunctionFactory<KeyValue> mfFactory;
protected final MergeSorter mergeSorter;
@Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer;

public MergeTreeCompactRewriter(
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
MergeFunctionFactory<KeyValue> mfFactory,
MergeSorter mergeSorter,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
MergeSorter mergeSorter) {
this.readerFactory = readerFactory;
this.writerFactory = writerFactory;
this.keyComparator = keyComparator;
this.userDefinedSeqComparator = userDefinedSeqComparator;
this.mfFactory = mfFactory;
this.mergeSorter = mergeSorter;
this.deletionVectorsMaintainer = deletionVectorsMaintainer;
}

@Override
Expand All @@ -88,11 +84,9 @@ protected CompactResult rewriteCompaction(
writer.write(new RecordReaderIterator<>(sectionsReader));
writer.close();
List<DataFileMeta> before = extractFilesFromSections(sections);
if (deletionVectorsMaintainer != null) {
for (DataFileMeta dataFileMeta : before) {
deletionVectorsMaintainer.removeDeletionVectorOf(dataFileMeta.fileName());
}
}
notifyCompactBefore(before);
return new CompactResult(before, writer.result());
}

protected void notifyCompactBefore(List<DataFileMeta> files) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected MergeTreeWriter createWriter(
List<DataFileMeta> restoreFiles,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Creating merge tree writer for partition {} bucket {} from restored files {}",
Expand All @@ -188,12 +188,7 @@ protected MergeTreeWriter createWriter(
: universalCompaction;
CompactManager compactManager =
createCompactManager(
partition,
bucket,
compactStrategy,
compactExecutor,
levels,
deletionVectorsMaintainer);
partition, bucket, compactStrategy, compactExecutor, levels, dvMaintainer);

return new MergeTreeWriter(
bufferSpillable(),
Expand Down Expand Up @@ -222,7 +217,7 @@ private CompactManager createCompactManager(
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
Levels levels,
@Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
@Nullable DeletionVectorsMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
Expand All @@ -235,7 +230,7 @@ private CompactManager createCompactManager(
keyComparator,
userDefinedSeqComparator,
levels,
deletionVectorsMaintainer);
dvMaintainer);
return new MergeTreeCompactManager(
compactExecutor,
levels,
Expand Down Expand Up @@ -321,7 +316,7 @@ private MergeTreeCompactRewriter createRewriter(
mfFactory,
mergeSorter,
wrapperFactory,
lookupStrategy,
lookupStrategy.produceChangelog,
deletionVectorsMaintainer);
} else {
return new MergeTreeCompactRewriter(
Expand All @@ -330,8 +325,7 @@ private MergeTreeCompactRewriter createRewriter(
keyComparator,
userDefinedSeqComparator,
mfFactory,
mergeSorter,
deletionVectorsMaintainer);
mergeSorter);
}
}

Expand Down
Loading