diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java index 1485b72b2c16..866e2d33cd96 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeReaders.java @@ -24,9 +24,7 @@ import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.mergetree.compact.ConcatRecordReader; import org.apache.paimon.mergetree.compact.ConcatRecordReader.ReaderSupplier; -import org.apache.paimon.mergetree.compact.MergeFunction; import org.apache.paimon.mergetree.compact.MergeFunctionWrapper; -import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.utils.FieldsComparator; @@ -42,16 +40,15 @@ public class MergeTreeReaders { private MergeTreeReaders() {} - public static RecordReader readerForMergeTree( + public static RecordReader readerForMergeTree( List> sections, - boolean dropDelete, KeyValueFileReaderFactory readerFactory, Comparator userKeyComparator, @Nullable FieldsComparator userDefinedSeqComparator, - MergeFunction mergeFunction, + MergeFunctionWrapper mergeFunctionWrapper, MergeSorter mergeSorter) throws IOException { - List> readers = new ArrayList<>(); + List> readers = new ArrayList<>(); for (List section : sections) { readers.add( () -> @@ -60,14 +57,10 @@ public static RecordReader readerForMergeTree( readerFactory, userKeyComparator, userDefinedSeqComparator, - new ReducerMergeFunctionWrapper(mergeFunction), + mergeFunctionWrapper, mergeSorter)); } - RecordReader reader = ConcatRecordReader.create(readers); - if (dropDelete) { - reader = new DropDeleteReader(reader); - } - return reader; + return ConcatRecordReader.create(readers); } public static RecordReader readerForSection( @@ -86,7 +79,7 @@ public static RecordReader readerForSection( readers, userKeyComparator, userDefinedSeqComparator, mergeFunctionWrapper); } - public static RecordReader readerForRun( + private static RecordReader readerForRun( SortedRun run, KeyValueFileReaderFactory readerFactory) throws IOException { List> readers = new ArrayList<>(); for (DataFileMeta file : run.files()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 0024d79ce973..ee83df08a1fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -27,14 +27,12 @@ import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.mergetree.MergeSorter; -import org.apache.paimon.mergetree.MergeTreeReaders; import org.apache.paimon.mergetree.SortedRun; -import org.apache.paimon.reader.RecordReaderIterator; +import org.apache.paimon.utils.CloseableIterator; import org.apache.paimon.utils.FieldsComparator; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -120,25 +118,15 @@ private CompactResult rewriteChangelogCompaction( boolean dropDelete, boolean rewriteCompactFile) throws Exception { - List> sectionReaders = new ArrayList<>(); - for (List section : sections) { - sectionReaders.add( - () -> - MergeTreeReaders.readerForSection( - section, - readerFactory, - keyComparator, - userDefinedSeqComparator, - createMergeWrapper(outputLevel), - mergeSorter)); - } - RecordReaderIterator iterator = null; + CloseableIterator iterator = null; RollingFileWriter compactFileWriter = null; RollingFileWriter changelogFileWriter = null; try { - iterator = new RecordReaderIterator<>(ConcatRecordReader.create(sectionReaders)); + iterator = + readerForMergeTree(sections, createMergeWrapper(outputLevel)) + .toCloseableIterator(); if (rewriteCompactFile) { compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel); } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index b4659db48c0f..07b2fe4fcc7f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -25,6 +25,7 @@ import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.mergetree.DropDeleteReader; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.MergeTreeReaders; import org.apache.paimon.mergetree.SortedRun; @@ -34,6 +35,7 @@ import javax.annotation.Nullable; +import java.io.IOException; import java.util.Comparator; import java.util.List; @@ -72,21 +74,29 @@ protected CompactResult rewriteCompaction( int outputLevel, boolean dropDelete, List> sections) throws Exception { RollingFileWriter writer = writerFactory.createRollingMergeTreeFileWriter(outputLevel); - RecordReader sectionsReader = - MergeTreeReaders.readerForMergeTree( - sections, - dropDelete, - readerFactory, - keyComparator, - userDefinedSeqComparator, - mfFactory.create(), - mergeSorter); - writer.write(new RecordReaderIterator<>(sectionsReader)); + RecordReader reader = + readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create())); + if (dropDelete) { + reader = new DropDeleteReader(reader); + } + writer.write(new RecordReaderIterator<>(reader)); writer.close(); List before = extractFilesFromSections(sections); notifyCompactBefore(before); return new CompactResult(before, writer.result()); } + protected RecordReader readerForMergeTree( + List> sections, MergeFunctionWrapper mergeFunctionWrapper) + throws IOException { + return MergeTreeReaders.readerForMergeTree( + sections, + readerFactory, + keyComparator, + userDefinedSeqComparator, + mergeFunctionWrapper, + mergeSorter); + } + protected void notifyCompactBefore(List files) {} } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 0e28ce970b3d..f6ddf74ea699 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -43,6 +43,7 @@ import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.mergetree.compact.IntervalPartition; import org.apache.paimon.mergetree.compact.MergeTreeCompactManager; +import org.apache.paimon.mergetree.compact.ReducerMergeFunctionWrapper; import org.apache.paimon.mergetree.compact.UniversalCompaction; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -556,12 +557,15 @@ private List readAll(List files, boolean dropDelete) RecordReader reader = MergeTreeReaders.readerForMergeTree( new IntervalPartition(files, comparator).partition(), - dropDelete, readerFactory, comparator, null, - DeduplicateMergeFunction.factory().create(), + new ReducerMergeFunctionWrapper( + DeduplicateMergeFunction.factory().create()), new MergeSorter(options, null, null, null)); + if (dropDelete) { + reader = new DropDeleteReader(reader); + } List records = new ArrayList<>(); try (RecordReaderIterator iterator = new RecordReaderIterator<>(reader)) { while (iterator.hasNext()) { @@ -600,16 +604,19 @@ public CompactResult rewrite( throws Exception { RollingFileWriter writer = writerFactory.createRollingMergeTreeFileWriter(outputLevel); - RecordReader sectionsReader = + RecordReader reader = MergeTreeReaders.readerForMergeTree( sections, - dropDelete, compactReaderFactory, comparator, null, - DeduplicateMergeFunction.factory().create(), + new ReducerMergeFunctionWrapper( + DeduplicateMergeFunction.factory().create()), new MergeSorter(options, null, null, null)); - writer.write(new RecordReaderIterator<>(sectionsReader)); + if (dropDelete) { + reader = new DropDeleteReader(reader); + } + writer.write(new RecordReaderIterator<>(reader)); writer.close(); return new CompactResult(extractFilesFromSections(sections), writer.result()); }