Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Dec 18, 2024
1 parent 5bd7802 commit 4c8230c
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import org.apache.doris.nereids.rules.expression.rules.MultiColumnBound;
import org.apache.doris.nereids.rules.expression.rules.PartitionItemToRange;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.hadoop.util.Lists;
Expand Down Expand Up @@ -84,22 +84,33 @@ private SortedPartitionRanges<?> loadCache(TableIdentifier key, OlapTable olapTa
Map<Long, PartitionItem> allPartitions = partitionInfo.getIdToItem(false);
List<Entry<Long, PartitionItem>> sortedList = Lists.newArrayList(allPartitions.entrySet());
List<PartitionItemAndRange<?>> sortedRanges = Lists.newArrayListWithCapacity(allPartitions.size());
List<PartitionItemAndId<?>> defaultPartitions = Lists.newArrayList();
for (Entry<Long, PartitionItem> entry : sortedList) {
TreeRangeSet<MultiColumnBound> rangeSet = PartitionItemToRange.toRangeSets(entry.getValue());
sortedRanges.add(new PartitionItemAndRange<>(entry.getKey(), entry.getValue(), rangeSet));
PartitionItem partitionItem = entry.getValue();
Long id = entry.getKey();
if (!partitionItem.isDefaultPartition()) {
List<Range<MultiColumnBound>> ranges = PartitionItemToRange.toRanges(partitionItem);
for (Range<MultiColumnBound> range : ranges) {
sortedRanges.add(new PartitionItemAndRange<>(id, partitionItem, range));
}
} else {
defaultPartitions.add(new PartitionItemAndId<>(id, partitionItem));
}
}

sortedRanges.sort((o1, o2) -> {
Range<MultiColumnBound> span1 = o1.ranges.span();
Range<MultiColumnBound> span2 = o2.ranges.span();
Range<MultiColumnBound> span1 = o1.range;
Range<MultiColumnBound> span2 = o2.range;
int result = span1.lowerEndpoint().compareTo(span2.lowerEndpoint());
if (result != 0) {
return result;
}
result = span1.upperEndpoint().compareTo(span2.upperEndpoint());
return result;
});
SortedPartitionRanges sortedPartitionRanges = new SortedPartitionRanges(sortedRanges);
SortedPartitionRanges<?> sortedPartitionRanges = new SortedPartitionRanges(
sortedRanges, defaultPartitions
);
PartitionCacheContext context = new PartitionCacheContext(
olapTable.getId(), olapTable.getVisibleVersion(), sortedPartitionRanges);
partitionCaches.put(key, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,25 @@
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
import org.apache.doris.nereids.types.DataType;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;

import java.util.List;

/** PartitionItemToRange */
public class PartitionItemToRange {
/** toRangeSets */
public static TreeRangeSet<MultiColumnBound> toRangeSets(PartitionItem partitionItem) {
public static List<Range<MultiColumnBound>> toRanges(PartitionItem partitionItem) {
if (partitionItem instanceof RangePartitionItem) {
Range<PartitionKey> range = partitionItem.getItems();
TreeRangeSet<MultiColumnBound> oneRangePartitionRanges = TreeRangeSet.create();
PartitionKey lowerKey = range.lowerEndpoint();
ImmutableList.Builder<ColumnBound> lowerBounds
= ImmutableList.builderWithExpectedSize(lowerKey.getKeys().size());
for (LiteralExpr key : lowerKey.getKeys()) {
Literal literal = Literal.fromLegacyLiteral(key, key.getType());
Literal literal = toNereidsLiteral(key);
lowerBounds.add(ColumnBound.of(literal));
}

Expand All @@ -53,26 +53,35 @@ public static TreeRangeSet<MultiColumnBound> toRangeSets(PartitionItem partition
upperBounds.add(ColumnBound.of(literal));
}

oneRangePartitionRanges.add(Range.closedOpen(
return ImmutableList.of(Range.closedOpen(
new MultiColumnBound(lowerBounds.build()),
new MultiColumnBound(upperBounds.build())));
return oneRangePartitionRanges;
new MultiColumnBound(upperBounds.build())
));
} else if (partitionItem instanceof ListPartitionItem) {
TreeRangeSet<MultiColumnBound> oneListPartitionRanges = TreeRangeSet.create();
List<PartitionKey> partitionKeys = partitionItem.getItems();
ImmutableList.Builder<Range<MultiColumnBound>> ranges
= ImmutableList.builderWithExpectedSize(partitionKeys.size());
for (PartitionKey partitionKey : partitionKeys) {
ImmutableList.Builder<ColumnBound> bounds
= ImmutableList.builderWithExpectedSize(partitionKeys.size());
for (LiteralExpr key : partitionKey.getKeys()) {
Literal literal = Literal.fromLegacyLiteral(key, key.getType());
Literal literal = toNereidsLiteral(key);
bounds.add(ColumnBound.of(literal));
}
MultiColumnBound bound = new MultiColumnBound(bounds.build());
oneListPartitionRanges.add(Range.singleton(bound));
ranges.add(Range.singleton(bound));
}
return oneListPartitionRanges;
return ranges.build();
} else {
throw new UnsupportedOperationException(partitionItem.getClass().getName());
}
}

private static Literal toNereidsLiteral(LiteralExpr partitionKeyLiteral) {
if (!partitionKeyLiteral.isMinValue()) {
return Literal.fromLegacyLiteral(partitionKeyLiteral, partitionKeyLiteral.getType());
} else {
return new NullLiteral(DataType.fromCatalogType(partitionKeyLiteral.getType()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.ComparisonPredicate;
Expand All @@ -42,12 +43,15 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;

import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/**
* PartitionPruner
Expand Down Expand Up @@ -106,7 +110,7 @@ public Expression visitComparisonPredicate(ComparisonPredicate cp, Void context)
}

/** prune */
public <K> List<K> prune() {
public <K extends Comparable<K>> List<K> prune() {
Builder<K> scanPartitionIdents = ImmutableList.builder();
for (OnePartitionEvaluator partition : partitions) {
if (!canBePrunedOut(partitionPredicate, partition)) {
Expand All @@ -116,7 +120,7 @@ public <K> List<K> prune() {
return scanPartitionIdents.build();
}

public static <K> List<K> prune(List<Slot> partitionSlots, Expression partitionPredicate,
public static <K extends Comparable<K>> List<K> prune(List<Slot> partitionSlots, Expression partitionPredicate,
Map<K, PartitionItem> idToPartitions, CascadesContext cascadesContext,
PartitionTableType partitionTableType) {
return prune(partitionSlots, partitionPredicate, idToPartitions,
Expand All @@ -126,7 +130,7 @@ public static <K> List<K> prune(List<Slot> partitionSlots, Expression partitionP
/**
* prune partition with `idToPartitions` as parameter.
*/
public static <K> List<K> prune(List<Slot> partitionSlots, Expression partitionPredicate,
public static <K extends Comparable<K>> List<K> prune(List<Slot> partitionSlots, Expression partitionPredicate,
Map<K, PartitionItem> idToPartitions, CascadesContext cascadesContext,
PartitionTableType partitionTableType, Optional<SortedPartitionRanges<K>> sortedPartitionRanges) {
partitionPredicate = PartitionPruneExpressionExtractor.extract(
Expand Down Expand Up @@ -177,61 +181,73 @@ public static <K> OnePartitionEvaluator<K> toPartitionEvaluator(K id, PartitionI
}
}

private static <K> List<K> binarySearchFiltering(
private static <K extends Comparable<K>> List<K> binarySearchFiltering(
SortedPartitionRanges<K> sortedPartitionRanges, List<Slot> partitionSlots,
Expression partitionPredicate, CascadesContext cascadesContext, int expandThreshold,
RangeSet<MultiColumnBound> predicateRanges) {
List<PartitionItemAndRange<K>> sortedPartitions = sortedPartitionRanges.sortedPartitions;
List<K> selectedPartitions = Lists.newArrayList();

Set<K> selectedIdSets = Sets.newTreeSet();
int leftIndex = 0;
int midIndex = 0;
for (Range<MultiColumnBound> predicateRange : predicateRanges.asRanges()) {
int rightIndex = sortedPartitions.size();
if (leftIndex >= rightIndex) {
break;
}
MultiColumnBound predicateLowerBound = predicateRange.lowerEndpoint();

int midIndex;
MultiColumnBound predicateUpperBound = predicateRange.upperEndpoint();
MultiColumnBound predicateLowerBound = predicateRange.lowerEndpoint();

while (leftIndex + 1 < rightIndex) {
midIndex = (leftIndex + rightIndex) / 2;
PartitionItemAndRange<K> partition = sortedPartitions.get(midIndex);
Range<MultiColumnBound> partitionSpan = partition.ranges.span();
MultiColumnBound partitionLowerBound = partitionSpan.lowerEndpoint();
Range<MultiColumnBound> partitionSpan = partition.range;

int compare = predicateLowerBound.compareTo(partitionLowerBound);
if (compare == 0) {
break;
} else if (compare < 0) {
if (predicateUpperBound.compareTo(partitionSpan.lowerEndpoint()) < 0) {
rightIndex = midIndex;
} else {
} else if (predicateLowerBound.compareTo(partitionSpan.upperEndpoint()) > 0) {
leftIndex = midIndex;
} else {
break;
}
}

for (leftIndex = midIndex; leftIndex < sortedPartitions.size(); leftIndex++) {
for (; leftIndex < sortedPartitions.size(); leftIndex++) {
PartitionItemAndRange<K> partition = sortedPartitions.get(leftIndex);
if (leftIndex > midIndex) {
Range<MultiColumnBound> partitionSpan = partition.ranges.span();
MultiColumnBound partitionLowerBound = partitionSpan.lowerEndpoint();
if (partitionLowerBound.compareTo(predicateUpperBound) > 0) {
break;
}
}

K partitionId = partition.id;
// list partition will expand to multiple PartitionItemAndRange, we should skip evaluate it again
if (selectedIdSets.contains(partitionId)) {
continue;
}

Range<MultiColumnBound> partitionSpan = partition.range;
if (predicateUpperBound.compareTo(partitionSpan.lowerEndpoint()) < 0) {
break;
}

OnePartitionEvaluator<K> partitionEvaluator = toPartitionEvaluator(
partitionId, partition.partitionItem, partitionSlots, cascadesContext, expandThreshold);
if (!canBePrunedOut(partitionPredicate, partitionEvaluator)) {
selectedPartitions.add(partitionId);
selectedIdSets.add(partitionId);
}
}
}

return selectedPartitions;
for (PartitionItemAndId<K> defaultPartition : sortedPartitionRanges.defaultPartitions) {
K partitionId = defaultPartition.id;
OnePartitionEvaluator<K> partitionEvaluator = toPartitionEvaluator(
partitionId, defaultPartition.partitionItem, partitionSlots, cascadesContext, expandThreshold);
if (!canBePrunedOut(partitionPredicate, partitionEvaluator)) {
selectedIdSets.add(partitionId);
}
}

return Utils.fastToImmutableList(selectedIdSets);
}

private static <K> List<K> sequentialFiltering(Map<K, PartitionItem> idToPartitions, List<Slot> partitionSlots,
private static <K extends Comparable<K>> List<K> sequentialFiltering(Map<K, PartitionItem> idToPartitions, List<Slot> partitionSlots,
Expression partitionPredicate, CascadesContext cascadesContext, int expandThreshold) {
List<OnePartitionEvaluator<?>> evaluators = Lists.newArrayListWithCapacity(idToPartitions.size());
for (Entry<K, PartitionItem> kv : idToPartitions.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,52 @@
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.nereids.util.Utils;

import com.google.common.collect.RangeSet;
import com.google.common.collect.Range;

import java.util.List;
import java.util.Objects;

/** SortedPartitionRanges */
public class SortedPartitionRanges<K> {
public final List<PartitionItemAndRange<K>> sortedPartitions;
public final List<PartitionItemAndId<K>> defaultPartitions;

public SortedPartitionRanges(List<PartitionItemAndRange<K>> sortedPartitions) {
public SortedPartitionRanges(
List<PartitionItemAndRange<K>> sortedPartitions, List<PartitionItemAndId<K>> defaultPartitions) {
this.sortedPartitions = Utils.fastToImmutableList(
Objects.requireNonNull(sortedPartitions, "sortedPartitions bounds can not be null")
);
this.defaultPartitions = Utils.fastToImmutableList(
Objects.requireNonNull(defaultPartitions, "defaultPartitions bounds can not be null")
);
}

/** PartitionItemAndRange */
public static class PartitionItemAndRange<K> {
public final K id;
public final PartitionItem partitionItem;
public final RangeSet<MultiColumnBound> ranges;
public final Range<MultiColumnBound> range;

public PartitionItemAndRange(K id, PartitionItem partitionItem, Range<MultiColumnBound> range) {
this.id = id;
this.partitionItem = partitionItem;
this.range = range;
}

@Override
public String toString() {
return range.toString();
}
}

/** PartitionItemAndId */
public static class PartitionItemAndId<K> {
public final K id;
public final PartitionItem partitionItem;

public PartitionItemAndRange(K id, PartitionItem partitionItem, RangeSet<MultiColumnBound> ranges) {
public PartitionItemAndId(K id, PartitionItem partitionItem) {
this.id = id;
this.partitionItem = partitionItem;
this.ranges = ranges;
}
}
}

0 comments on commit 4c8230c

Please sign in to comment.