Skip to content

Commit

Permalink
binary search filter partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Nov 26, 2024
1 parent 14d928b commit 5db0287
Show file tree
Hide file tree
Showing 13 changed files with 777 additions and 29 deletions.
42 changes: 39 additions & 3 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -1425,10 +1425,34 @@ public class Config extends ConfigBase {
@ConfField(
mutable = true,
masterOnly = false,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig"
callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig",
description = {
"当前默认设置为 300,用来控制控制NereidsSqlCacheManager中sql cache过期时间,超过一段时间不访问cache会被回收",
"The current default setting is 300, which is used to control the expiration time of SQL cache"
+ "in NereidsSqlCacheManager. If the cache is not accessed for a period of time, "
+ "it will be reclaimed"
}
)
public static int expire_sql_cache_in_fe_second = 300;


/**
* Expire sql sql in frontend time
*/
@ConfField(
mutable = true,
masterOnly = false,
callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig",
description = {
"当前默认设置为 300,用来控制控制NereidsSortedPartitionsCacheManager中分区元数据缓存过期时间,"
+ "超过一段时间不访问cache会被回收",
"The current default setting is 300, which is used to control the expiration time of "
+ "the partition metadata cache in NereidsSortedPartitionsCheManager. "
+ "If the cache is not accessed for a period of time, it will be reclaimed"
}
)
public static int expire_cache_partition_meta_table_in_fe_second = 300;

/**
* Set the maximum number of rows that can be cached
*/
Expand Down Expand Up @@ -2247,8 +2271,7 @@ public class Config extends ConfigBase {
*/
@ConfField(
mutable = true,
varType = VariableAnnotation.EXPERIMENTAL,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig",
callbackClassString = "org.apache.doris.common.cache.NereidsSqlCacheManager$UpdateConfig",
description = {
"当前默认设置为 100,用来控制控制NereidsSqlCacheManager管理的sql cache数量。",
"Now default set to 100, this config is used to control the number of "
Expand All @@ -2257,6 +2280,19 @@ public class Config extends ConfigBase {
)
public static int sql_cache_manage_num = 100;

@ConfField(
mutable = true,
callbackClassString = "org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager$UpdateConfig",
description = {
"当前默认设置为 100,用来控制控制NereidsSortedPartitionsCacheManager中有序分区元数据的缓存个数,"
+ "用于加速分区裁剪",
"The current default setting is 100, which is used to control the number of ordered "
+ "partition metadata caches in NereidsSortedPartitionsCacheManager, "
+ "and to accelerate partition pruning"
}
)
public static int cache_partition_meta_table_manage_num = 100;

/**
* Maximum number of events to poll in each RPC.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public interface ConfHandler {
void handle(Field field, String confVal) throws Exception;
}

static class DefaultConfHandler implements ConfHandler {
public static class DefaultConfHandler implements ConfHandler {
@Override
public void handle(Field field, String confVal) throws Exception {
setConfigField(field, confVal);
Expand Down
10 changes: 9 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.NereidsSqlCacheManager;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager;
import org.apache.doris.common.cache.NereidsSqlCacheManager;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantLock;
Expand Down Expand Up @@ -567,6 +568,8 @@ public class Env {

private final NereidsSqlCacheManager sqlCacheManager;

private final NereidsSortedPartitionsCacheManager sortedPartitionsCacheManager;

private final SplitSourceManager splitSourceManager;

private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr;
Expand Down Expand Up @@ -819,6 +822,7 @@ public Env(boolean isCheckpointCatalog) {
this.insertOverwriteManager = new InsertOverwriteManager();
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
this.sortedPartitionsCacheManager = new NereidsSortedPartitionsCacheManager();
this.splitSourceManager = new SplitSourceManager();
this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr();
this.tokenManager = new TokenManager();
Expand Down Expand Up @@ -6651,6 +6655,10 @@ public NereidsSqlCacheManager getSqlCacheManager() {
return sqlCacheManager;
}

public NereidsSortedPartitionsCacheManager getSortedPartitionsCacheManager() {
return sortedPartitionsCacheManager;
}

public SplitSourceManager getSplitSourceManager() {
return splitSourceManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,12 +1357,12 @@ public Collection<Partition> getPartitions() {
}

// get only temp partitions
public Collection<Partition> getAllTempPartitions() {
public List<Partition> getAllTempPartitions() {
return tempPartitions.getAllPartitions();
}

// get all partitions including temp partitions
public Collection<Partition> getAllPartitions() {
public List<Partition> getAllPartitions() {
List<Partition> partitions = Lists.newArrayList(idToPartition.values());
partitions.addAll(tempPartitions.getAllPartitions());
return partitions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.cache;

import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.rules.expression.rules.MultiColumnBound;
import org.apache.doris.nereids.rules.expression.rules.PartitionItemToRange;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Range;
import com.google.common.collect.TreeRangeSet;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.hadoop.util.Lists;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;

/** NereidsSortedPartitionsCacheManager */
public class NereidsSortedPartitionsCacheManager {
private volatile Cache<TableIdentifier, PartitionCacheContext> partitionCaches;

public NereidsSortedPartitionsCacheManager() {
partitionCaches = buildCaches(
Config.cache_partition_meta_table_manage_num,
Config.expire_cache_partition_meta_table_in_fe_second
);
}

public Optional<SortedPartitionRanges<?>> get(OlapTable table) {
DatabaseIf<?> database = table.getDatabase();
if (database == null) {
return Optional.empty();
}
CatalogIf<?> catalog = database.getCatalog();
if (catalog == null) {
return Optional.empty();
}
TableIdentifier key = new TableIdentifier(
catalog.getName(), database.getFullName(), table.getName());
PartitionCacheContext partitionCacheContext = partitionCaches.getIfPresent(key);
if (partitionCacheContext == null) {
return Optional.of(loadCache(key, table));
}
if (table.getId() != partitionCacheContext.tableId
|| table.getVisibleVersion() != partitionCacheContext.tableVersion) {
partitionCaches.invalidate(key);
return Optional.empty();
}
return Optional.of(partitionCacheContext.sortedPartitionRanges);
}

private SortedPartitionRanges<?> loadCache(TableIdentifier key, OlapTable olapTable) {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Map<Long, PartitionItem> allPartitions = partitionInfo.getIdToItem(false);
List<Entry<Long, PartitionItem>> sortedList = Lists.newArrayList(allPartitions.entrySet());
List<PartitionItemAndRange<?>> sortedRanges = Lists.newArrayListWithCapacity(allPartitions.size());
for (Entry<Long, PartitionItem> entry : sortedList) {
TreeRangeSet<MultiColumnBound> rangeSet = PartitionItemToRange.toRangeSets(entry.getValue());
sortedRanges.add(new PartitionItemAndRange<>(entry.getKey(), entry.getValue(), rangeSet));
}

sortedRanges.sort((o1, o2) -> {
Range<MultiColumnBound> span1 = o1.ranges.span();
Range<MultiColumnBound> span2 = o2.ranges.span();
int result = span1.lowerEndpoint().compareTo(span2.lowerEndpoint());
if (result != 0) {
return result;
}
result = span1.upperEndpoint().compareTo(span2.upperEndpoint());
return result;
});
SortedPartitionRanges sortedPartitionRanges = new SortedPartitionRanges(sortedRanges);
PartitionCacheContext context = new PartitionCacheContext(
olapTable.getId(), olapTable.getVisibleVersion(), sortedPartitionRanges);
partitionCaches.put(key, context);
return sortedPartitionRanges;
}

private static Cache<TableIdentifier, PartitionCacheContext> buildCaches(
int sortedPartitionTableManageNum, int expireSortedPartitionTableInFeSecond) {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
// auto evict cache when jvm memory too low
.softValues();
if (sortedPartitionTableManageNum > 0) {
cacheBuilder = cacheBuilder.maximumSize(sortedPartitionTableManageNum);
}
if (expireSortedPartitionTableInFeSecond > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireSortedPartitionTableInFeSecond));
}

return cacheBuilder.build();
}

public static synchronized void updateConfig() {
Env currentEnv = Env.getCurrentEnv();
if (currentEnv == null) {
return;
}
NereidsSortedPartitionsCacheManager cacheManager = currentEnv.getSortedPartitionsCacheManager();
if (cacheManager == null) {
return;
}

Cache<TableIdentifier, PartitionCacheContext> caches = buildCaches(
Config.cache_partition_meta_table_manage_num,
Config.expire_cache_partition_meta_table_in_fe_second
);
caches.putAll(cacheManager.partitionCaches.asMap());
cacheManager.partitionCaches = caches;
}

@Data
@AllArgsConstructor
private static class TableIdentifier {
public final String catalog;
public final String db;
public final String table;
}

private static class PartitionCacheContext {
private final long tableId;
private final long tableVersion;
private final SortedPartitionRanges sortedPartitionRanges;

public PartitionCacheContext(
long tableId, long tableVersion, SortedPartitionRanges sortedPartitionRanges) {
this.tableId = tableId;
this.tableVersion = tableVersion;
this.sortedPartitionRanges = sortedPartitionRanges;
}

@Override
public String toString() {
return "PartitionCacheContext(tableId="
+ tableId + ", tableVersion=" + tableVersion
+ ", partitionNum=" + sortedPartitionRanges.sortedPartitions.size() + ")";
}
}

// NOTE: used in Config.cache_partition_meta_table_manage_num.callbackClassString and
// Config.expire_cache_partition_meta_table_in_fe_second.callbackClassString,
// don't remove it!
public static class UpdateConfig extends DefaultConfHandler {
@Override
public void handle(Field field, String confVal) throws Exception {
super.handle(field, confVal);
NereidsSortedPartitionsCacheManager.updateConfig();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common;
package org.apache.doris.common.cache;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
Expand All @@ -24,7 +24,10 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.View;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.common.Pair;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.metric.MetricRepo;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.rules.expression.rules;

import java.util.List;

/** MultiColumnBound */
public class MultiColumnBound implements Comparable<MultiColumnBound> {
private final List<ColumnBound> columnBounds;

public MultiColumnBound(List<ColumnBound> columnBounds) {
this.columnBounds = columnBounds;
}

@Override
public int compareTo(MultiColumnBound o) {
for (int i = 0; i < columnBounds.size(); i++) {
ColumnBound columnBound = columnBounds.get(i);
ColumnBound otherColumnBound = o.columnBounds.get(i);
int result = columnBound.compareTo(otherColumnBound);
if (result != 0) {
return result;
}
}
return 0;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < columnBounds.size(); i++) {
if (i > 0) {
sb.append(",");
}
sb.append(columnBounds.get(i));
}
return sb.toString();
}
}
Loading

0 comments on commit 5db0287

Please sign in to comment.