Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Introduce SplitGroup for SplitGenerator #3059

Merged
merged 5 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.append.AppendOnlyCompactManager.fileComparator;

Expand All @@ -44,21 +45,23 @@ public AppendOnlySplitGenerator(
}

@Override
public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> input) {
public List<SplitGroup> splitForBatch(List<DataFileMeta> input) {
List<DataFileMeta> files = new ArrayList<>(input);
files.sort(fileComparator(bucketMode == BucketMode.UNAWARE));
Function<DataFileMeta, Long> weightFunc = file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}

@Override
public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files) {
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// When the bucket mode is unaware, we spit the files as batch, because unaware-bucket table
// only contains one bucket (bucket 0).
if (bucketMode == BucketMode.UNAWARE) {
return splitForBatch(files);
} else {
return Collections.singletonList(files);
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ public MergeTreeSplitGenerator(
}

@Override
public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
if (deletionVectorsEnabled || mergeEngine == FIRST_ROW) {
public List<SplitGroup> splitForBatch(List<DataFileMeta> files) {
boolean rawConvertible =
files.stream().allMatch(file -> file.level() != 0 && withoutDeleteRow(file));
boolean oneLevel =
Zouxxyy marked this conversation as resolved.
Show resolved Hide resolved
files.stream().map(DataFileMeta::level).collect(Collectors.toSet()).size() == 1;

if (rawConvertible && (deletionVectorsEnabled || mergeEngine == FIRST_ROW || oneLevel)) {
Function<DataFileMeta, Long> weightFunc =
file -> Math.max(file.fileSize(), openFileCost);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize);
return BinPacking.packForOrdered(files, weightFunc, targetSplitSize).stream()
.map(SplitGroup::rawConvertibleGroup)
.collect(Collectors.toList());
}

/*
Expand Down Expand Up @@ -93,13 +100,19 @@ public List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files) {
new IntervalPartition(files, keyComparator)
.partition().stream().map(this::flatRun).collect(Collectors.toList());

return packSplits(sections);
return packSplits(sections).stream()
.map(
f ->
f.size() == 1 && withoutDeleteRow(f.get(0))
? SplitGroup.rawConvertibleGroup(f)
: SplitGroup.nonRawConvertibleGroup(f))
.collect(Collectors.toList());
}

@Override
public List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files) {
public List<SplitGroup> splitForStreaming(List<DataFileMeta> files) {
// We don't split streaming scan files
return Collections.singletonList(files);
return Collections.singletonList(SplitGroup.rawConvertibleGroup(files));
}

private List<List<DataFileMeta>> packSplits(List<List<DataFileMeta>> sections) {
Expand Down Expand Up @@ -129,4 +142,8 @@ private List<DataFileMeta> flatFiles(List<List<DataFileMeta>> section) {
section.forEach(files::addAll);
return files;
}

private boolean withoutDeleteRow(DataFileMeta dataFileMeta) {
return dataFileMeta.deleteRowCount().map(count -> count == 0L).orElse(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,27 @@
/** Generate splits from {@link DataFileMeta}s. */
public interface SplitGenerator {

List<List<DataFileMeta>> splitForBatch(List<DataFileMeta> files);
List<SplitGroup> splitForBatch(List<DataFileMeta> files);

List<List<DataFileMeta>> splitForStreaming(List<DataFileMeta> files);
List<SplitGroup> splitForStreaming(List<DataFileMeta> files);

/** Split group. */
class SplitGroup {

public final List<DataFileMeta> files;
public final boolean rawConvertible;

private SplitGroup(List<DataFileMeta> files, boolean rawConvertible) {
this.files = files;
this.rawConvertible = rawConvertible;
}

public static SplitGroup rawConvertibleGroup(List<DataFileMeta> files) {
return new SplitGroup(files, true);
}

public static SplitGroup nonRawConvertibleGroup(List<DataFileMeta> files) {
return new SplitGroup(files, false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.PlanImpl;
import org.apache.paimon.table.source.ScanMode;
import org.apache.paimon.table.source.SplitGenerator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;

Expand Down Expand Up @@ -65,15 +66,15 @@ public Result scan(SnapshotReader reader) {
for (Map.Entry<Pair<BinaryRow, Integer>, List<DataFileMeta>> entry : grouped.entrySet()) {
BinaryRow partition = entry.getKey().getLeft();
int bucket = entry.getKey().getRight();
for (List<DataFileMeta> files :
for (SplitGenerator.SplitGroup splitGroup :
reader.splitGenerator().splitForBatch(entry.getValue())) {
// TODO pass deletion files
result.add(
DataSplit.builder()
.withSnapshot(endingSnapshotId)
.withPartition(partition)
.withBucket(bucket)
.withDataFiles(files)
.withDataFiles(splitGroup.files)
.build());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.operation.FileStoreScan.Plan.groupByPartFiles;
import static org.apache.paimon.predicate.PredicateBuilder.transformFieldMapping;
Expand Down Expand Up @@ -278,18 +277,24 @@ private List<DataSplit> generateSplits(
.withPartition(partition)
.withBucket(bucket)
.isStreaming(isStreaming);
List<List<DataFileMeta>> splitGroups =
List<SplitGenerator.SplitGroup> splitGroups =
isStreaming
? splitGenerator.splitForStreaming(bucketFiles)
: splitGenerator.splitForBatch(bucketFiles);
for (List<DataFileMeta> dataFiles : splitGroups) {
builder.withDataFiles(dataFiles)
.rawFiles(convertToRawFiles(partition, bucket, dataFiles));
if (deletionVectors) {
IndexFileMeta deletionIndexFile =
indexFileHandler

IndexFileMeta deletionIndexFile =
deletionVectors
? indexFileHandler
.scan(snapshotId, DELETION_VECTORS_INDEX, partition, bucket)
.orElse(null);
.orElse(null)
: null;
for (SplitGenerator.SplitGroup splitGroup : splitGroups) {
List<DataFileMeta> dataFiles = splitGroup.files;
builder.withDataFiles(dataFiles);
if (splitGroup.rawConvertible) {
builder.rawFiles(convertToRawFiles(partition, bucket, dataFiles));
}
if (deletionVectors) {
builder.withDataDeletionFiles(
getDeletionFiles(dataFiles, deletionIndexFile));
}
Expand Down Expand Up @@ -370,8 +375,7 @@ private Plan toChangesPlan(
.withBucket(bucket)
.withBeforeFiles(before)
.withDataFiles(data)
.isStreaming(isStreaming)
.rawFiles(convertToRawFiles(part, bucket, data));
.isStreaming(isStreaming);
if (deletionVectors) {
IndexFileMeta beforeDeletionIndex =
indexFileHandler
Expand Down Expand Up @@ -437,21 +441,6 @@ private List<DeletionFile> getDeletionFiles(
private List<RawFile> convertToRawFiles(
BinaryRow partition, int bucket, List<DataFileMeta> dataFiles) {
String bucketPath = pathFactory.bucketPath(partition, bucket).toString();

// append only or deletionVectors files can be returned
if (tableSchema.primaryKeys().isEmpty() || deletionVectors || mergeEngine == FIRST_ROW) {
return makeRawTableFiles(bucketPath, dataFiles);
}

int maxLevel = options.numLevels() - 1;
if (dataFiles.stream().map(DataFileMeta::level).allMatch(l -> l == maxLevel)) {
return makeRawTableFiles(bucketPath, dataFiles);
}

return Collections.emptyList();
}

private List<RawFile> makeRawTableFiles(String bucketPath, List<DataFileMeta> dataFiles) {
return dataFiles.stream()
.map(f -> makeRawTableFile(bucketPath, f))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ public static DataFileMeta newFile() {

public static DataFileMeta newFile(
String name, int level, int minKey, int maxKey, long maxSequence) {
return newFile(name, level, minKey, maxKey, maxSequence, 0L);
}

public static DataFileMeta newFile(
String name, int level, int minKey, int maxKey, long maxSequence, long deleteRowCount) {
return new DataFileMeta(
name,
maxKey - minKey + 1,
1,
maxKey - minKey + 1,
row(minKey),
row(maxKey),
null,
Expand All @@ -84,7 +89,7 @@ public static DataFileMeta newFile(
maxSequence,
0,
level,
0L);
deleteRowCount);
}

public static BinaryRow row(int i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Pair;

import org.junit.jupiter.api.Test;

Expand All @@ -31,8 +32,10 @@
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static org.apache.paimon.io.DataFileTestUtils.fromMinMax;
import static org.apache.paimon.io.DataFileTestUtils.newFile;
import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link AppendOnlySplitGenerator} and {@link MergeTreeSplitGenerator}. */
Expand Down Expand Up @@ -124,13 +127,86 @@ public void testMergeTree() {
Collections.singletonList("6"));
}

private List<List<String>> toNames(List<List<DataFileMeta>> splits) {
return splits.stream()
@Test
public void testSplitRawConvertible() {
Comparator<InternalRow> comparator = Comparator.comparingInt(o -> o.getInt(0));
MergeTreeSplitGenerator mergeTreeSplitGenerator =
new MergeTreeSplitGenerator(comparator, 100, 2, false, DEDUPLICATE);

// When level0 exists, should not be rawConvertible
List<DataFileMeta> files1 =
Arrays.asList(newFile("1", 0, 0, 10, 10L), newFile("2", 0, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files1)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false));

// When deleteRowCount > 0, should not be rawConvertible
List<DataFileMeta> files2 =
Arrays.asList(newFile("1", 1, 0, 10, 10L, 1L), newFile("2", 1, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files2)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false));

// No level0 and deleteRowCount == 0:
// All in one level, should be rawConvertible
List<DataFileMeta> files3 =
Arrays.asList(newFile("1", 1, 0, 10, 10L), newFile("2", 1, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files3)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true));

// Not all in one level, should not be rawConvertible
List<DataFileMeta> files4 =
Arrays.asList(newFile("1", 1, 0, 10, 10L), newFile("2", 2, 10, 20, 20L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files4)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), false));

// Not all in one level but with deletion vectors enabled, should be rawConvertible
MergeTreeSplitGenerator splitGeneratorWithDVEnabled =
new MergeTreeSplitGenerator(comparator, 100, 2, true, DEDUPLICATE);
assertThat(toNamesAndRawConvertible(splitGeneratorWithDVEnabled.splitForBatch(files4)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true));

// Not all in one level but with first row merge engine, should be rawConvertible
MergeTreeSplitGenerator splitGeneratorWithFirstRow =
new MergeTreeSplitGenerator(comparator, 100, 2, false, FIRST_ROW);
assertThat(toNamesAndRawConvertible(splitGeneratorWithFirstRow.splitForBatch(files4)))
.containsExactlyInAnyOrder(Pair.of(Arrays.asList("1", "2"), true));

// Split with one file should be rawConvertible
List<DataFileMeta> files5 =
Arrays.asList(
newFile("1", 1, 0, 10, 10L),
newFile("2", 2, 0, 12, 12L),
newFile("3", 3, 15, 60, 60L),
newFile("4", 4, 18, 40, 40L),
newFile("5", 5, 82, 85, 85L),
newFile("6", 6, 100, 200, 200L));
assertThat(toNamesAndRawConvertible(mergeTreeSplitGenerator.splitForBatch(files5)))
.containsExactlyInAnyOrder(
Pair.of(Arrays.asList("1", "2", "3", "4", "5"), false),
Pair.of(Collections.singletonList("6"), true));
}

private List<List<String>> toNames(List<SplitGenerator.SplitGroup> splitGroups) {
return splitGroups.stream()
.map(
files ->
files.stream()
splitGroup ->
splitGroup.files.stream()
.map(DataFileMeta::fileName)
.collect(Collectors.toList()))
.collect(Collectors.toList());
}

private List<Pair<List<String>, Boolean>> toNamesAndRawConvertible(
List<SplitGenerator.SplitGroup> splitGroups) {
return splitGroups.stream()
.map(
splitGroup -> {
List<String> sortedFileNames =
splitGroup.files.stream()
.sorted(Comparator.comparing(DataFileMeta::fileName))
.map(DataFileMeta::fileName)
.collect(Collectors.toList());
return Pair.of(sortedFileNames, splitGroup.rawConvertible);
})
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void testGetPrimaryKeyRawFiles() throws Exception {
assertThat(dataSplit.dataFiles()).hasSize(1);
DataFileMeta meta = dataSplit.dataFiles().get(0);
String partition = dataSplit.partition().getString(0).toString();
assertThat(dataSplit.convertToRawFiles()).isNotPresent();
assertThat(dataSplit.convertToRawFiles()).isPresent();
}

// write another file on level 0
Expand Down
Loading