From 5db028768777f68472092bdb389952d1e0bec389 Mon Sep 17 00:00:00 2001 From: 924060929 Date: Mon, 25 Nov 2024 21:41:52 +0800 Subject: [PATCH] binary search filter partitions --- .../java/org/apache/doris/common/Config.java | 42 ++- .../org/apache/doris/common/ConfigBase.java | 2 +- .../java/org/apache/doris/catalog/Env.java | 10 +- .../org/apache/doris/catalog/OlapTable.java | 4 +- .../NereidsSortedPartitionsCacheManager.java | 180 +++++++++++++ .../{ => cache}/NereidsSqlCacheManager.java | 5 +- .../expression/rules/MultiColumnBound.java | 54 ++++ .../rules/PartitionItemToRange.java | 78 ++++++ .../rules/PartitionPredicateToRange.java | 249 ++++++++++++++++++ .../expression/rules/PartitionPruner.java | 115 ++++++-- .../rules/SortedPartitionRanges.java | 46 ++++ .../rules/rewrite/PruneOlapScanPartition.java | 19 +- .../org/apache/doris/qe/StmtExecutor.java | 2 +- 13 files changed, 777 insertions(+), 29 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java rename fe/fe-core/src/main/java/org/apache/doris/common/{ => cache}/NereidsSqlCacheManager.java (99%) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/MultiColumnBound.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionItemToRange.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPredicateToRange.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/SortedPartitionRanges.java diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index be0390db584ca92..588f45aefd43d32 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -1425,10 +1425,34 @@ public class Config extends ConfigBase { @ConfField( mutable = true, masterOnly = false, - callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig" + callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig", + description = { + "当前默认设置为 300,用来控制控制NereidsSqlCacheManager中sql cache过期时间,超过一段时间不访问cache会被回收", + "The current default setting is 300, which is used to control the expiration time of SQL cache" + + "in NereidsSqlCacheManager. If the cache is not accessed for a period of time, " + + "it will be reclaimed" + } ) public static int expire_sql_cache_in_fe_second = 300; + + /** + * Expire sql sql in frontend time + */ + @ConfField( + mutable = true, + masterOnly = false, + callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig", + description = { + "当前默认设置为 300,用来控制控制NereidsSortedPartitionsCacheManager中分区元数据缓存过期时间," + + "超过一段时间不访问cache会被回收", + "The current default setting is 300, which is used to control the expiration time of " + + "the partition metadata cache in NereidsSortedPartitionsCheManager. " + + "If the cache is not accessed for a period of time, it will be reclaimed" + } + ) + public static int expire_cache_partition_meta_table_in_fe_second = 300; + /** * Set the maximum number of rows that can be cached */ @@ -2247,8 +2271,7 @@ public class Config extends ConfigBase { */ @ConfField( mutable = true, - varType = VariableAnnotation.EXPERIMENTAL, - callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig", + callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig", description = { "当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。", "Now default set to 100, this config is used to control the number of " @@ -2257,6 +2280,19 @@ public class Config extends ConfigBase { ) public static int sql_cache_manage_num = 100; + @ConfField( + mutable = true, + callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig", + description = { + "当前默认设置为 100,用来控制控制NereidsSortedPartitionsCacheManager中有序分区元数据的缓存个数," + + "用于加速分区裁剪", + "The current default setting is 100, which is used to control the number of ordered " + + "partition metadata caches in NereidsSortedPartitionsCacheManager, " + + "and to accelerate partition pruning" + } + ) + public static int cache_partition_meta_table_manage_num = 100; + /** * Maximum number of events to poll in each RPC. */ diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java b/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java index 18ae1dc1c0171f1..7181921792572e5 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/ConfigBase.java @@ -72,7 +72,7 @@ public interface ConfHandler { void handle(Field field, String confVal) throws Exception; } - static class DefaultConfHandler implements ConfHandler { + public static class DefaultConfHandler implements ConfHandler { @Override public void handle(Field field, String confVal) throws Exception { setConfigField(field, confVal); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 7597a7d256b4603..174e34d4bc5cfea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -107,10 +107,11 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LogUtils; import org.apache.doris.common.MetaNotFoundException; -import org.apache.doris.common.NereidsSqlCacheManager; import org.apache.doris.common.Pair; import org.apache.doris.common.ThreadPoolManager; import org.apache.doris.common.UserException; +import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager; +import org.apache.doris.common.cache.NereidsSqlCacheManager; import org.apache.doris.common.io.CountingDataOutputStream; import org.apache.doris.common.io.Text; import org.apache.doris.common.lock.MonitoredReentrantLock; @@ -567,6 +568,8 @@ public class Env { private final NereidsSqlCacheManager sqlCacheManager; + private final NereidsSortedPartitionsCacheManager sortedPartitionsCacheManager; + private final SplitSourceManager splitSourceManager; private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr; @@ -819,6 +822,7 @@ public Env(boolean isCheckpointCatalog) { this.insertOverwriteManager = new InsertOverwriteManager(); this.dnsCache = new DNSCache(); this.sqlCacheManager = new NereidsSqlCacheManager(); + this.sortedPartitionsCacheManager = new NereidsSortedPartitionsCacheManager(); this.splitSourceManager = new SplitSourceManager(); this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr(); this.tokenManager = new TokenManager(); @@ -6651,6 +6655,10 @@ public NereidsSqlCacheManager getSqlCacheManager() { return sqlCacheManager; } + public NereidsSortedPartitionsCacheManager getSortedPartitionsCacheManager() { + return sortedPartitionsCacheManager; + } + public SplitSourceManager getSplitSourceManager() { return splitSourceManager; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index ec3bd2acbc57d6b..ae5eb7220bfea80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -1357,12 +1357,12 @@ public Collection getPartitions() { } // get only temp partitions - public Collection getAllTempPartitions() { + public List getAllTempPartitions() { return tempPartitions.getAllPartitions(); } // get all partitions including temp partitions - public Collection getAllPartitions() { + public List getAllPartitions() { List partitions = Lists.newArrayList(idToPartition.values()); partitions.addAll(tempPartitions.getAllPartitions()); return partitions; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java new file mode 100644 index 000000000000000..0b4ab71aaae90a4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSortedPartitionsCacheManager.java @@ -0,0 +1,180 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.cache; + +import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.PartitionInfo; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.common.Config; +import org.apache.doris.common.ConfigBase.DefaultConfHandler; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.nereids.rules.expression.rules.MultiColumnBound; +import org.apache.doris.nereids.rules.expression.rules.PartitionItemToRange; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.google.common.collect.Range; +import com.google.common.collect.TreeRangeSet; +import lombok.AllArgsConstructor; +import lombok.Data; +import org.apache.hadoop.util.Lists; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; + +/** NereidsSortedPartitionsCacheManager */ +public class NereidsSortedPartitionsCacheManager { + private volatile Cache partitionCaches; + + public NereidsSortedPartitionsCacheManager() { + partitionCaches = buildCaches( + Config.cache_partition_meta_table_manage_num, + Config.expire_cache_partition_meta_table_in_fe_second + ); + } + + public Optional> get(OlapTable table) { + DatabaseIf database = table.getDatabase(); + if (database == null) { + return Optional.empty(); + } + CatalogIf catalog = database.getCatalog(); + if (catalog == null) { + return Optional.empty(); + } + TableIdentifier key = new TableIdentifier( + catalog.getName(), database.getFullName(), table.getName()); + PartitionCacheContext partitionCacheContext = partitionCaches.getIfPresent(key); + if (partitionCacheContext == null) { + return Optional.of(loadCache(key, table)); + } + if (table.getId() != partitionCacheContext.tableId + || table.getVisibleVersion() != partitionCacheContext.tableVersion) { + partitionCaches.invalidate(key); + return Optional.empty(); + } + return Optional.of(partitionCacheContext.sortedPartitionRanges); + } + + private SortedPartitionRanges loadCache(TableIdentifier key, OlapTable olapTable) { + PartitionInfo partitionInfo = olapTable.getPartitionInfo(); + Map allPartitions = partitionInfo.getIdToItem(false); + List> sortedList = Lists.newArrayList(allPartitions.entrySet()); + List> sortedRanges = Lists.newArrayListWithCapacity(allPartitions.size()); + for (Entry entry : sortedList) { + TreeRangeSet rangeSet = PartitionItemToRange.toRangeSets(entry.getValue()); + sortedRanges.add(new PartitionItemAndRange<>(entry.getKey(), entry.getValue(), rangeSet)); + } + + sortedRanges.sort((o1, o2) -> { + Range span1 = o1.ranges.span(); + Range span2 = o2.ranges.span(); + int result = span1.lowerEndpoint().compareTo(span2.lowerEndpoint()); + if (result != 0) { + return result; + } + result = span1.upperEndpoint().compareTo(span2.upperEndpoint()); + return result; + }); + SortedPartitionRanges sortedPartitionRanges = new SortedPartitionRanges(sortedRanges); + PartitionCacheContext context = new PartitionCacheContext( + olapTable.getId(), olapTable.getVisibleVersion(), sortedPartitionRanges); + partitionCaches.put(key, context); + return sortedPartitionRanges; + } + + private static Cache buildCaches( + int sortedPartitionTableManageNum, int expireSortedPartitionTableInFeSecond) { + Caffeine cacheBuilder = Caffeine.newBuilder() + // auto evict cache when jvm memory too low + .softValues(); + if (sortedPartitionTableManageNum > 0) { + cacheBuilder = cacheBuilder.maximumSize(sortedPartitionTableManageNum); + } + if (expireSortedPartitionTableInFeSecond > 0) { + cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireSortedPartitionTableInFeSecond)); + } + + return cacheBuilder.build(); + } + + public static synchronized void updateConfig() { + Env currentEnv = Env.getCurrentEnv(); + if (currentEnv == null) { + return; + } + NereidsSortedPartitionsCacheManager cacheManager = currentEnv.getSortedPartitionsCacheManager(); + if (cacheManager == null) { + return; + } + + Cache caches = buildCaches( + Config.cache_partition_meta_table_manage_num, + Config.expire_cache_partition_meta_table_in_fe_second + ); + caches.putAll(cacheManager.partitionCaches.asMap()); + cacheManager.partitionCaches = caches; + } + + @Data + @AllArgsConstructor + private static class TableIdentifier { + public final String catalog; + public final String db; + public final String table; + } + + private static class PartitionCacheContext { + private final long tableId; + private final long tableVersion; + private final SortedPartitionRanges sortedPartitionRanges; + + public PartitionCacheContext( + long tableId, long tableVersion, SortedPartitionRanges sortedPartitionRanges) { + this.tableId = tableId; + this.tableVersion = tableVersion; + this.sortedPartitionRanges = sortedPartitionRanges; + } + + @Override + public String toString() { + return "PartitionCacheContext(tableId=" + + tableId + ", tableVersion=" + tableVersion + + ", partitionNum=" + sortedPartitionRanges.sortedPartitions.size() + ")"; + } + } + + // NOTE: used in Config.cache_partition_meta_table_manage_num.callbackClassString and + // Config.expire_cache_partition_meta_table_in_fe_second.callbackClassString, + // don't remove it! + public static class UpdateConfig extends DefaultConfHandler { + @Override + public void handle(Field field, String confVal) throws Exception { + super.handle(field, confVal); + NereidsSortedPartitionsCacheManager.updateConfig(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java similarity index 99% rename from fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java rename to fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java index 1317fdaefc766eb..79f51da8ab92695 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.common; +package org.apache.doris.common.cache; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.DatabaseIf; @@ -24,7 +24,10 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; import org.apache.doris.catalog.View; +import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase.DefaultConfHandler; +import org.apache.doris.common.Pair; +import org.apache.doris.common.Status; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.metric.MetricRepo; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/MultiColumnBound.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/MultiColumnBound.java new file mode 100644 index 000000000000000..c3ee4fc24daa08f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/MultiColumnBound.java @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import java.util.List; + +/** MultiColumnBound */ +public class MultiColumnBound implements Comparable { + private final List columnBounds; + + public MultiColumnBound(List columnBounds) { + this.columnBounds = columnBounds; + } + + @Override + public int compareTo(MultiColumnBound o) { + for (int i = 0; i < columnBounds.size(); i++) { + ColumnBound columnBound = columnBounds.get(i); + ColumnBound otherColumnBound = o.columnBounds.get(i); + int result = columnBound.compareTo(otherColumnBound); + if (result != 0) { + return result; + } + } + return 0; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < columnBounds.size(); i++) { + if (i > 0) { + sb.append(","); + } + sb.append(columnBounds.get(i)); + } + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionItemToRange.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionItemToRange.java new file mode 100644 index 000000000000000..bde2f3cb1a9c891 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionItemToRange.java @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.ListPartitionItem; +import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.catalog.PartitionKey; +import org.apache.doris.catalog.RangePartitionItem; +import org.apache.doris.nereids.trees.expressions.literal.Literal; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Range; +import com.google.common.collect.TreeRangeSet; + +import java.util.List; + +/** PartitionItemToRange */ +public class PartitionItemToRange { + /** toRangeSets */ + public static TreeRangeSet toRangeSets(PartitionItem partitionItem) { + if (partitionItem instanceof RangePartitionItem) { + Range range = partitionItem.getItems(); + TreeRangeSet oneRangePartitionRanges = TreeRangeSet.create(); + PartitionKey lowerKey = range.lowerEndpoint(); + ImmutableList.Builder lowerBounds + = ImmutableList.builderWithExpectedSize(lowerKey.getKeys().size()); + for (LiteralExpr key : lowerKey.getKeys()) { + Literal literal = Literal.fromLegacyLiteral(key, key.getType()); + lowerBounds.add(ColumnBound.of(literal)); + } + + PartitionKey upperKey = range.upperEndpoint(); + ImmutableList.Builder upperBounds + = ImmutableList.builderWithExpectedSize(lowerKey.getKeys().size()); + for (LiteralExpr key : upperKey.getKeys()) { + Literal literal = Literal.fromLegacyLiteral(key, key.getType()); + upperBounds.add(ColumnBound.of(literal)); + } + + oneRangePartitionRanges.add(Range.closedOpen( + new MultiColumnBound(lowerBounds.build()), + new MultiColumnBound(upperBounds.build()))); + return oneRangePartitionRanges; + } else if (partitionItem instanceof ListPartitionItem) { + TreeRangeSet oneListPartitionRanges = TreeRangeSet.create(); + List partitionKeys = partitionItem.getItems(); + for (PartitionKey partitionKey : partitionKeys) { + ImmutableList.Builder bounds + = ImmutableList.builderWithExpectedSize(partitionKeys.size()); + for (LiteralExpr key : partitionKey.getKeys()) { + Literal literal = Literal.fromLegacyLiteral(key, key.getType()); + bounds.add(ColumnBound.of(literal)); + } + MultiColumnBound bound = new MultiColumnBound(bounds.build()); + oneListPartitionRanges.add(Range.singleton(bound)); + } + return oneListPartitionRanges; + } else { + throw new UnsupportedOperationException(partitionItem.getClass().getName()); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPredicateToRange.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPredicateToRange.java new file mode 100644 index 000000000000000..5b2a11e6759f429 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPredicateToRange.java @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import org.apache.doris.nereids.trees.expressions.And; +import org.apache.doris.nereids.trees.expressions.EqualTo; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.GreaterThan; +import org.apache.doris.nereids.trees.expressions.GreaterThanEqual; +import org.apache.doris.nereids.trees.expressions.InPredicate; +import org.apache.doris.nereids.trees.expressions.IsNull; +import org.apache.doris.nereids.trees.expressions.LessThan; +import org.apache.doris.nereids.trees.expressions.LessThanEqual; +import org.apache.doris.nereids.trees.expressions.Not; +import org.apache.doris.nereids.trees.expressions.NullSafeEqual; +import org.apache.doris.nereids.trees.expressions.Or; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.expressions.literal.MaxLiteral; +import org.apache.doris.nereids.trees.expressions.literal.NullLiteral; +import org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionVisitor; + +import com.google.common.collect.BoundType; +import com.google.common.collect.ImmutableRangeSet; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; +import com.google.common.collect.TreeRangeSet; +import org.apache.hadoop.util.Lists; + +import java.util.List; +import java.util.Set; + +/** PartitionPredicateToRange */ +public class PartitionPredicateToRange extends DefaultExpressionVisitor, Void> { + private List columns; + private Set slotIds; + + /** PartitionPredicateToRange */ + public PartitionPredicateToRange(List columns) { + this.columns = columns; + + ImmutableSet.Builder slotIds = ImmutableSet.builderWithExpectedSize(columns.size()); + for (Slot column : columns) { + slotIds.add(column.getExprId().asInt()); + } + this.slotIds = slotIds.build(); + } + + @Override + public RangeSet visitAnd(And and, Void context) { + RangeSet leftRanges = and.left().accept(this, context); + if (leftRanges == null) { + return null; + } + RangeSet rightRanges = and.right().accept(this, context); + if (rightRanges == null) { + return null; + } + + RangeSet intersects = TreeRangeSet.create(); + intersects.addAll(leftRanges); + for (Range rightRange : rightRanges.asRanges()) { + intersects = intersects.subRangeSet(rightRange); + if (intersects.isEmpty()) { + break; + } + } + return intersects; + } + + @Override + public RangeSet visitOr(Or or, Void context) { + RangeSet leftRanges = or.left().accept(this, context); + if (leftRanges == null) { + return null; + } + RangeSet rightRanges = or.right().accept(this, context); + if (rightRanges == null) { + return null; + } + + RangeSet intersects = TreeRangeSet.create(); + intersects.addAll(leftRanges); + intersects.addAll(rightRanges); + return intersects; + } + + @Override + public RangeSet visitNot(Not not, Void context) { + Expression child = not.child(); + if (child instanceof IsNull && ((IsNull) child).child() instanceof SlotReference) { + SlotReference slot = (SlotReference) ((IsNull) child).child(); + int slotId = slot.getExprId().asInt(); + if (slotIds.contains(slotId)) { + Range singleton = ColumnBound.singleton(new NullLiteral(child.getDataType())); + return toRangeSet(slot, singleton, BoundType.OPEN, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitIsNull(IsNull isNull, Void context) { + Expression child = isNull.child(); + if (child instanceof SlotReference && slotIds.contains(((SlotReference) child).getExprId().asInt())) { + Range singleton = ColumnBound.singleton(new NullLiteral(child.getDataType())); + return toRangeSet((SlotReference) child, singleton, BoundType.CLOSED, BoundType.CLOSED); + } + return null; + } + + @Override + public RangeSet visitEqualTo(EqualTo equalTo, Void context) { + Expression left = equalTo.left(); + Expression right = equalTo.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Range singleton = ColumnBound.singleton((Literal) right); + return toRangeSet((SlotReference) left, singleton, BoundType.CLOSED, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitNullSafeEqual(NullSafeEqual nullSafeEqual, Void context) { + Expression left = nullSafeEqual.left(); + Expression right = nullSafeEqual.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Range singleton = ColumnBound.singleton(new NullLiteral(left.getDataType())); + return toRangeSet((SlotReference) left, singleton, BoundType.CLOSED, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitInPredicate(InPredicate inPredicate, Void context) { + Expression compareExpr = inPredicate.getCompareExpr(); + if (compareExpr instanceof SlotReference) { + SlotReference slot = (SlotReference) compareExpr; + if (slotIds.contains((slot).getExprId().asInt())) { + RangeSet union = TreeRangeSet.create(); + for (Expression option : inPredicate.getOptions()) { + if (!(option instanceof Literal)) { + return null; + } + Range singleton = ColumnBound.singleton((Literal) option); + union.addAll( + toRangeSet(slot, singleton, BoundType.CLOSED, BoundType.CLOSED) + ); + } + return union; + } + } + return null; + } + + @Override + public RangeSet visitLessThan(LessThan lessThan, Void context) { + Expression left = lessThan.left(); + Expression right = lessThan.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Range singleton = ColumnBound.singleton((Literal) right); + return toRangeSet((SlotReference) left, singleton, BoundType.CLOSED, BoundType.OPEN); + } + } + return null; + } + + @Override + public RangeSet visitLessThanEqual(LessThanEqual lessThanEqual, Void context) { + Expression left = lessThanEqual.left(); + Expression right = lessThanEqual.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Range singleton = ColumnBound.singleton((Literal) right); + return toRangeSet((SlotReference) left, singleton, BoundType.CLOSED, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitGreaterThan(GreaterThan greaterThan, Void context) { + Expression left = greaterThan.left(); + Expression right = greaterThan.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Range singleton = ColumnBound.singleton((Literal) right); + return toRangeSet((SlotReference) left, singleton, BoundType.OPEN, BoundType.CLOSED); + } + } + return null; + } + + @Override + public RangeSet visitGreaterThanEqual(GreaterThanEqual greaterThanEqual, Void context) { + Expression left = greaterThanEqual.left(); + Expression right = greaterThanEqual.right(); + if (left instanceof SlotReference && right instanceof Literal) { + if (slotIds.contains(((SlotReference) left).getExprId().asInt())) { + Range singleton = ColumnBound.singleton((Literal) right); + return toRangeSet((SlotReference) left, singleton, BoundType.CLOSED, BoundType.CLOSED); + } + } + return null; + } + + private RangeSet toRangeSet( + SlotReference slotReference, Range columnRange, + BoundType lowerBoundType, BoundType upperBoundType) { + List lowerBounds = Lists.newArrayListWithCapacity(columns.size()); + List upperBounds = Lists.newArrayListWithCapacity(columns.size()); + for (Slot column : columns) { + if (column.getExprId().asInt() == slotReference.getExprId().asInt()) { + lowerBounds.add(columnRange.lowerEndpoint()); + upperBounds.add(columnRange.upperEndpoint()); + } else { + lowerBounds.add(ColumnBound.of(new NullLiteral(slotReference.getDataType()))); + upperBounds.add(ColumnBound.of(new MaxLiteral(slotReference.getDataType()))); + } + } + MultiColumnBound lowerBound = new MultiColumnBound(lowerBounds); + MultiColumnBound upperBound = new MultiColumnBound(lowerBounds); + + Range range = Range.range(lowerBound, lowerBoundType, upperBound, upperBoundType); + return ImmutableRangeSet.of(range); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java index fac1a7f82d2cfb8..26484f9f0659914 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/PartitionPruner.java @@ -22,6 +22,7 @@ import org.apache.doris.catalog.RangePartitionItem; import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange; import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.ComparisonPredicate; import org.apache.doris.nereids.trees.expressions.Expression; @@ -39,6 +40,8 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import com.google.common.collect.Range; +import com.google.common.collect.RangeSet; import java.util.List; import java.util.Map; @@ -49,7 +52,7 @@ * PartitionPruner */ public class PartitionPruner extends DefaultExpressionRewriter { - private final List partitions; + private final List> partitions; private final Expression partitionPredicate; /** Different type of table may have different partition prune behavior. */ @@ -58,7 +61,7 @@ public enum PartitionTableType { HIVE } - private PartitionPruner(List partitions, Expression partitionPredicate) { + private PartitionPruner(List> partitions, Expression partitionPredicate) { this.partitions = Objects.requireNonNull(partitions, "partitions cannot be null"); this.partitionPredicate = Objects.requireNonNull(partitionPredicate.accept(this, null), "partitionPredicate cannot be null"); @@ -105,19 +108,25 @@ public Expression visitComparisonPredicate(ComparisonPredicate cp, Void context) public List prune() { Builder scanPartitionIdents = ImmutableList.builder(); for (OnePartitionEvaluator partition : partitions) { - if (!canBePrunedOut(partition)) { + if (!canBePrunedOut(partitionPredicate, partition)) { scanPartitionIdents.add((K) partition.getPartitionIdent()); } } return scanPartitionIdents.build(); } + public static List prune(List partitionSlots, Expression partitionPredicate, + Map idToPartitions, CascadesContext cascadesContext, + PartitionTableType partitionTableType) { + return prune(partitionSlots, partitionPredicate, idToPartitions, cascadesContext, partitionTableType, null); + } + /** * prune partition with `idToPartitions` as parameter. */ public static List prune(List partitionSlots, Expression partitionPredicate, Map idToPartitions, CascadesContext cascadesContext, - PartitionTableType partitionTableType) { + PartitionTableType partitionTableType, SortedPartitionRanges sortedPartitionRanges) { partitionPredicate = PartitionPruneExpressionExtractor.extract( partitionPredicate, ImmutableSet.copyOf(partitionSlots), cascadesContext); partitionPredicate = PredicateRewriteForPartitionPrune.rewrite(partitionPredicate, cascadesContext); @@ -134,39 +143,111 @@ public static List prune(List partitionSlots, Expression partitionP return ImmutableList.of(); } - List evaluators = Lists.newArrayListWithCapacity(idToPartitions.size()); - for (Entry kv : idToPartitions.entrySet()) { - evaluators.add(toPartitionEvaluator( - kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold)); + if (sortedPartitionRanges != null) { + RangeSet predicateRanges = partitionPredicate.accept( + new PartitionPredicateToRange(partitionSlots), null); + if (predicateRanges != null) { + return binarySearchFiltering( + sortedPartitionRanges, partitionSlots, partitionPredicate, cascadesContext, + expandThreshold, predicateRanges + ); + } } - PartitionPruner partitionPruner = new PartitionPruner(evaluators, partitionPredicate); - //TODO: we keep default partition because it's too hard to prune it, we return false in canPrune(). - return partitionPruner.prune(); + + return sequentialFiltering( + idToPartitions, partitionSlots, partitionPredicate, cascadesContext, expandThreshold + ); } /** * convert partition item to partition evaluator */ - public static final OnePartitionEvaluator toPartitionEvaluator(K id, PartitionItem partitionItem, + public static OnePartitionEvaluator toPartitionEvaluator(K id, PartitionItem partitionItem, List partitionSlots, CascadesContext cascadesContext, int expandThreshold) { if (partitionItem instanceof ListPartitionItem) { - return new OneListPartitionEvaluator( + return new OneListPartitionEvaluator<>( id, partitionSlots, (ListPartitionItem) partitionItem, cascadesContext); } else if (partitionItem instanceof RangePartitionItem) { - return new OneRangePartitionEvaluator( + return new OneRangePartitionEvaluator<>( id, partitionSlots, (RangePartitionItem) partitionItem, cascadesContext, expandThreshold); } else { - return new UnknownPartitionEvaluator(id, partitionItem); + return new UnknownPartitionEvaluator<>(id, partitionItem); + } + } + + private static List binarySearchFiltering( + SortedPartitionRanges sortedPartitionRanges, List partitionSlots, + Expression partitionPredicate, CascadesContext cascadesContext, int expandThreshold, + RangeSet predicateRanges) { + List> sortedPartitions = sortedPartitionRanges.sortedPartitions; + List selectedPartitions = Lists.newArrayList(); + + int leftIndex = 0; + int midIndex = 0; + for (Range predicateRange : predicateRanges.asRanges()) { + int rightIndex = sortedPartitions.size(); + if (leftIndex >= rightIndex) { + break; + } + MultiColumnBound predicateLowerBound = predicateRange.lowerEndpoint(); + MultiColumnBound predicateUpperBound = predicateRange.upperEndpoint(); + while (leftIndex + 1 < rightIndex) { + midIndex = (leftIndex + rightIndex) / 2; + PartitionItemAndRange partition = sortedPartitions.get(midIndex); + Range partitionSpan = partition.ranges.span(); + MultiColumnBound partitionLowerBound = partitionSpan.lowerEndpoint(); + + int compare = predicateLowerBound.compareTo(partitionLowerBound); + if (compare == 0) { + break; + } else if (compare < 0) { + rightIndex = midIndex; + } else { + leftIndex = midIndex; + } + } + + for (leftIndex = midIndex; leftIndex < sortedPartitions.size(); leftIndex++) { + PartitionItemAndRange partition = sortedPartitions.get(leftIndex); + if (leftIndex > midIndex) { + Range partitionSpan = partition.ranges.span(); + MultiColumnBound partitionLowerBound = partitionSpan.lowerEndpoint(); + if (partitionLowerBound.compareTo(predicateUpperBound) > 0) { + break; + } + } + + K partitionId = partition.id; + OnePartitionEvaluator partitionEvaluator = toPartitionEvaluator( + partitionId, partition.partitionItem, partitionSlots, cascadesContext, expandThreshold); + if (!canBePrunedOut(partitionPredicate, partitionEvaluator)) { + selectedPartitions.add(partitionId); + } + } } + + return selectedPartitions; + } + + private static List sequentialFiltering(Map idToPartitions, List partitionSlots, + Expression partitionPredicate, CascadesContext cascadesContext, int expandThreshold) { + List> evaluators = Lists.newArrayListWithCapacity(idToPartitions.size()); + for (Entry kv : idToPartitions.entrySet()) { + evaluators.add(toPartitionEvaluator( + kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold)); + } + PartitionPruner partitionPruner = new PartitionPruner(evaluators, partitionPredicate); + //TODO: we keep default partition because it's too hard to prune it, we return false in canPrune(). + return partitionPruner.prune(); } /** * return true if partition is not qualified. that is, can be pruned out. */ - private boolean canBePrunedOut(OnePartitionEvaluator evaluator) { + private static boolean canBePrunedOut(Expression partitionPredicate, OnePartitionEvaluator evaluator) { List> onePartitionInputs = evaluator.getOnePartitionInputs(); for (Map currentInputs : onePartitionInputs) { - // evaluate wether there's possible for this partition to accept this predicate + // evaluate whether there's possible for this partition to accept this predicate Expression result = evaluator.evaluateWithDefaultPartition(partitionPredicate, currentInputs); if (!result.equals(BooleanLiteral.FALSE) && !(result instanceof NullLiteral)) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/SortedPartitionRanges.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/SortedPartitionRanges.java new file mode 100644 index 000000000000000..209bad45b54404b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/SortedPartitionRanges.java @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.expression.rules; + +import org.apache.doris.catalog.PartitionItem; + +import com.google.common.collect.RangeSet; + +import java.util.List; + +/** SortedPartitionRanges */ +public class SortedPartitionRanges { + public final List> sortedPartitions; + + public SortedPartitionRanges(List> sortedPartitions) { + this.sortedPartitions = sortedPartitions; + } + + /** PartitionItemAndRange */ + public static class PartitionItemAndRange { + public final K id; + public final PartitionItem partitionItem; + public final RangeSet ranges; + + public PartitionItemAndRange(K id, PartitionItem partitionItem, RangeSet ranges) { + this.id = id; + this.partitionItem = partitionItem; + this.ranges = ranges; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java index 0d5054086117d91..3cbc39cb2f02232 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PruneOlapScanPartition.java @@ -18,13 +18,16 @@ package org.apache.doris.nereids.rules.rewrite; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.PartitionInfo; import org.apache.doris.catalog.PartitionItem; +import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.rules.expression.rules.PartitionPruner; import org.apache.doris.nereids.rules.expression.rules.PartitionPruner.PartitionTableType; +import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; @@ -33,10 +36,12 @@ import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -76,10 +81,18 @@ public Rule build() { partitionSlots.add(partitionSlot); } } + NereidsSortedPartitionsCacheManager sortedPartitionsCacheManager = Env.getCurrentEnv() + .getSortedPartitionsCacheManager(); List manuallySpecifiedPartitions = scan.getManuallySpecifiedPartitions(); - Map idToPartitions; + Map idToPartitions = ImmutableMap.of(); + SortedPartitionRanges sortedPartitionRanges = null; if (manuallySpecifiedPartitions.isEmpty()) { - idToPartitions = partitionInfo.getIdToItem(false); + Optional> sortedPartitionRangesOpt = sortedPartitionsCacheManager.get(table); + if (sortedPartitionRangesOpt.isPresent()) { + sortedPartitionRanges = (SortedPartitionRanges) sortedPartitionRangesOpt.get(); + } else { + idToPartitions = partitionInfo.getIdToItem(false); + } } else { Map allPartitions = partitionInfo.getAllPartitions(); idToPartitions = allPartitions.keySet().stream() @@ -88,7 +101,7 @@ public Rule build() { } List prunedPartitions = PartitionPruner.prune( partitionSlots, filter.getPredicate(), idToPartitions, ctx.cascadesContext, - PartitionTableType.OLAP); + PartitionTableType.OLAP, sortedPartitionRanges); if (prunedPartitions.isEmpty()) { return new LogicalEmptyRelation( ConnectContext.get().getStatementContext().getNextRelationId(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 05c7853ca469dff..292220e2dc3e400 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -108,10 +108,10 @@ import org.apache.doris.common.FormatOptions; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NereidsException; -import org.apache.doris.common.NereidsSqlCacheManager; import org.apache.doris.common.Status; import org.apache.doris.common.UserException; import org.apache.doris.common.Version; +import org.apache.doris.common.cache.NereidsSqlCacheManager; import org.apache.doris.common.profile.Profile; import org.apache.doris.common.profile.ProfileManager.ProfileType; import org.apache.doris.common.profile.SummaryProfile;