Skip to content

Commit

Permalink
[core] Introduce SplitGroup for SplitGenerator to optimize more rawFi…
Browse files Browse the repository at this point in the history
…les (#3059)
  • Loading branch information
Zouxxyy authored Mar 26, 2024
1 parent 2401f7e commit 9839efa
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.append.AppendOnlyCompactManager.fileComparator;

Expand All @@ -44,21 +45,23 @@ public AppendOnlySplitGenerator(
}

@Override
public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> input) {
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
files.sort(fileComparator(bucketMode == BucketMode.UNAWARE));
Function<DataFileMeta, Long> weightFunc = file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}

@Override
public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files) {
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// When the bucket mode is unaware, we spit the files as batch, because unaware-bucket table
// only contains one bucket (bucket 0).
if (bucketMode == BucketMode.UNAWARE) {
return splitForBatch(files);
} else {
return Collections.singletonList(files);
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ public MergeTreeSplitGenerator(
}

@Override
public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
if (deletionVectorsEnabled || mergeEngine == FIRST_ROW) {
public List<SplitGroup> splitForBatch(List<DataFileMeta> files) {
boolean rawConvertible =
files.stream().allMatch(file -> file.level() != 0 && withoutDeleteRow(file));
boolean oneLevel =
files.stream().map(DataFileMeta::level).collect(Collectors.toSet()).size() == 1;

if (rawConvertible && (deletionVectorsEnabled || mergeEngine == FIRST_ROW || oneLevel)) {
Function<DataFileMeta, Long> weightFunc =
file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}

/*
Expand Down Expand Up @@ -93,13 +100,19 @@ public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
new IntervalPartition(files, keyComparator)
.partition().stream().map(this::flatRun).collect(Collectors.toList());

return packSplits(sections);
return packSplits(sections).stream()
.map(
f ->
f.size() == 1 && withoutDeleteRow(f.get(0))
? SplitGroup.rawConvertibleGroup(f)
: SplitGroup.nonRawConvertibleGroup(f))
.collect(Collectors.toList());
}

@Override
public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files) {
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// We don't split streaming scan files
return Collections.singletonList(files);
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
}

private List<List<DataFileMeta>> packSplits(List<List<DataFileMeta>> sections) {
Expand Down Expand Up @@ -129,4 +142,8 @@ private List<DataFileMeta> flatFiles(List<List<DataFileMeta>> section) {
section.forEach(files::addAll);
return files;
}

private boolean withoutDeleteRow(DataFileMeta dataFileMeta) {
return dataFileMeta.deleteRowCount().map(count -> count == 0L).orElse(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,27 @@
/** Generate splits from {@link DataFileMeta}s. */
public interface SplitGenerator {

List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files);
List<SplitGroup> splitForBatch(List<DataFileMeta> files);

List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files);
List<SplitGroup> splitForStreaming(List<DataFileMeta> files);

/** Split group. */
class SplitGroup {

public final List<DataFileMeta> files;
public final boolean rawConvertible;

private SplitGroup(List<DataFileMeta> files, boolean rawConvertible) {
this.files = files;
this.rawConvertible = rawConvertible;
}

public static SplitGroup rawConvertibleGroup(List<DataFileMeta> files) {
return new SplitGroup(files, true);
}

public static SplitGroup nonRawConvertibleGroup(List<DataFileMeta> files) {
return new SplitGroup(files, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

Expand Down Expand Up @@ -65,15 +66,15 @@ public Result scan(SnapshotReader reader) {
for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry : grouped.entrySet()) {
BinaryRow partition = entry.getKey().getLeft();
int bucket = entry.getKey().getRight();
for (List<DataFileMeta> files :
for (SplitGenerator.SplitGroup splitGroup :
reader.splitGenerator().splitForBatch(entry.getValue())) {
// TODO pass deletion files
result.add(
DataSplit.builder()
.withSnapshot(endingSnapshotId)
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.withDataFiles(splitGroup.files)
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
Expand Down Expand Up @@ -278,18 +277,24 @@ private List<DataSplit> generateSplits(
.withPartition(partition)
.withBucket(bucket)
.isStreaming(isStreaming);
List<List<DataFileMeta>> splitGroups =
List<SplitGenerator.SplitGroup> splitGroups =
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
for (List<DataFileMeta> dataFiles : splitGroups) {
builder.withDataFiles(dataFiles)
.rawFiles(convertToRawFiles(partition, bucket, dataFiles));
if (deletionVectors) {
IndexFileMeta deletionIndexFile =
indexFileHandler

IndexFileMeta deletionIndexFile =
deletionVectors
? indexFileHandler
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
.orElse(null)
: null;
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
builder.withDataFiles(dataFiles);
if (splitGroup.rawConvertible) {
builder.rawFiles(convertToRawFiles(partition, bucket, dataFiles));
}
if (deletionVectors) {
builder.withDataDeletionFiles(
getDeletionFiles(dataFiles, deletionIndexFile));
}
Expand Down Expand Up @@ -370,8 +375,7 @@ private Plan toChangesPlan(
.withBucket(bucket)
.withBeforeFiles(before)
.withDataFiles(data)
.isStreaming(isStreaming)
.rawFiles(convertToRawFiles(part, bucket, data));
.isStreaming(isStreaming);
if (deletionVectors) {
IndexFileMeta beforeDeletionIndex =
indexFileHandler
Expand Down Expand Up @@ -437,21 +441,6 @@ private List<DeletionFile> getDeletionFiles(
private List<RawFile> convertToRawFiles(
BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();

// append only or deletionVectors files can be returned
if (tableSchema.primaryKeys().isEmpty() || deletionVectors || mergeEngine == FIRST_ROW) {
return makeRawTableFiles(bucketPath, dataFiles);
}

int maxLevel = options.numLevels() - 1;
if (dataFiles.stream().map(DataFileMeta::level).allMatch(l -> l == maxLevel)) {
return makeRawTableFiles(bucketPath, dataFiles);
}

return Collections.emptyList();
}

private List<RawFile> makeRawTableFiles(String bucketPath, List<DataFileMeta> dataFiles) {
return dataFiles.stream()
.map(f -> makeRawTableFile(bucketPath, f))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ public static DataFileMeta newFile() {

public static DataFileMeta newFile(
String name, int level, int minKey, int maxKey, long maxSequence) {
return newFile(name, level, minKey, maxKey, maxSequence, 0L);
}

public static DataFileMeta newFile(
String name, int level, int minKey, int maxKey, long maxSequence, long deleteRowCount) {
return new DataFileMeta(
name,
maxKey - minKey + 1,
1,
maxKey - minKey + 1,
row(minKey),
row(maxKey),
null,
Expand All @@ -84,7 +89,7 @@ public static DataFileMeta newFile(
maxSequence,
0,
level,
0L);
deleteRowCount);
}

public static BinaryRow row(int i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.Test;

Expand All @@ -31,8 +32,10 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.io.DataFileTestUtils.fromMinMax;
import static org.apache.paimon.io.DataFileTestUtils.newFile;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link AppendOnlySplitGenerator} and {@link MergeTreeSplitGenerator}. */
Expand Down Expand Up @@ -124,13 +127,86 @@ public void testMergeTree() {
Collections.singletonList("6"));
}

private List<List<String>> toNames(List<List<DataFileMeta>> splits) {
return splits.stream()
@Test
public void testSplitRawConvertible() {
Comparator<InternalRow> comparator = Comparator.comparingInt(o -> o.getInt(0));
MergeTreeSplitGenerator mergeTreeSplitGenerator =
new MergeTreeSplitGenerator(comparator, 100, 2, false, DEDUPLICATE);

// When level0 exists, should not be rawConvertible
List<DataFileMeta> files1 =
Arrays.asList(newFile("1", 0, 0, 10, 10L), newFile("2", 0, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files1)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false));

// When deleteRowCount > 0, should not be rawConvertible
List<DataFileMeta> files2 =
Arrays.asList(newFile("1", 1, 0, 10, 10L, 1L), newFile("2", 1, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files2)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false));

// No level0 and deleteRowCount == 0:
// All in one level, should be rawConvertible
List<DataFileMeta> files3 =
Arrays.asList(newFile("1", 1, 0, 10, 10L), newFile("2", 1, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files3)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true));

// Not all in one level, should not be rawConvertible
List<DataFileMeta> files4 =
Arrays.asList(newFile("1", 1, 0, 10, 10L), newFile("2", 2, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files4)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false));

// Not all in one level but with deletion vectors enabled, should be rawConvertible
MergeTreeSplitGenerator splitGeneratorWithDVEnabled =
new MergeTreeSplitGenerator(comparator, 100, 2, true, DEDUPLICATE);
assertThat(toNamesAndRawConvertible(splitGeneratorWithDVEnabled.splitForBatch(files4)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true));

// Not all in one level but with first row merge engine, should be rawConvertible
MergeTreeSplitGenerator splitGeneratorWithFirstRow =
new MergeTreeSplitGenerator(comparator, 100, 2, false, FIRST_ROW);
assertThat(toNamesAndRawConvertible(splitGeneratorWithFirstRow.splitForBatch(files4)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true));

// Split with one file should be rawConvertible
List<DataFileMeta> files5 =
Arrays.asList(
newFile("1", 1, 0, 10, 10L),
newFile("2", 2, 0, 12, 12L),
newFile("3", 3, 15, 60, 60L),
newFile("4", 4, 18, 40, 40L),
newFile("5", 5, 82, 85, 85L),
newFile("6", 6, 100, 200, 200L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files5)))
.containsExactlyInAnyOrder(
Pair.of(Arrays.asList("1", "2", "3", "4", "5"), false),
Pair.of(Collections.singletonList("6"), true));
}

private List<List<String>> toNames(List<SplitGenerator.SplitGroup> splitGroups) {
return splitGroups.stream()
.map(
files ->
files.stream()
splitGroup ->
splitGroup.files.stream()
.map(DataFileMeta::fileName)
.collect(Collectors.toList()))
.collect(Collectors.toList());
}

private List<Pair<List<String>, Boolean>> toNamesAndRawConvertible(
List<SplitGenerator.SplitGroup> splitGroups) {
return splitGroups.stream()
.map(
splitGroup -> {
List<String> sortedFileNames =
splitGroup.files.stream()
.sorted(Comparator.comparing(DataFileMeta::fileName))
.map(DataFileMeta::fileName)
.collect(Collectors.toList());
return Pair.of(sortedFileNames, splitGroup.rawConvertible);
})
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testGetPrimaryKeyRawFiles() throws Exception {
assertThat(dataSplit.dataFiles()).hasSize(1);
DataFileMeta meta = dataSplit.dataFiles().get(0);
String partition = dataSplit.partition().getString(0).toString();
assertThat(dataSplit.convertToRawFiles()).isNotPresent();
assertThat(dataSplit.convertToRawFiles()).isPresent();
}

// write another file on level 0
Expand Down

0 comments on commit 9839efa

Please sign in to comment.