Skip to content

Commit

Permalink
[Opt](multi-catalog)Improve performance by introducing cache of list …
Browse files Browse the repository at this point in the history
…directory files when getting split for each query.
  • Loading branch information
kaka11chen committed Dec 30, 2024
1 parent b3186cb commit 5fd4a75
Show file tree
Hide file tree
Showing 25 changed files with 2,618 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2203,6 +2203,10 @@ public class Config extends ConfigBase {
"Max cache number of external table row count"})
public static long max_external_table_row_count_cache_num = 100000;

@ConfField(description = {"每个查询的外表文件元数据缓存的最大文件数量。",
"Max cache file number of external table split file meta cache at query level."})
public static long max_external_table_split_file_meta_cache_num = 100000;

/**
* Max cache loader thread-pool size.
* Max thread pool size for loading external meta cache
Expand Down
54 changes: 50 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/common/CacheFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.CacheLoader;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Ticker;
import com.github.benmanes.caffeine.cache.Weigher;
import org.jetbrains.annotations.NotNull;

import java.time.Duration;
Expand All @@ -44,28 +46,57 @@
* The cache can be created with the above parameters using the buildCache and buildAsyncCache methods.
* </p>
*/
public class CacheFactory {
public class CacheFactory<K, V> {

private OptionalLong expireAfterWriteSec;
private OptionalLong refreshAfterWriteSec;
private long maxSize;
private OptionalLong maxSize;
private boolean enableStats;
// Ticker is used to provide a time source for the cache.
// Only used for test, to provide a fake time source.
// If not provided, the system time is used.
private Ticker ticker;

private OptionalLong maxWeight;

private Weigher<K, V> weigher;

public CacheFactory(
OptionalLong expireAfterWriteSec,
OptionalLong refreshAfterWriteSec,
long maxSize,
boolean enableStats,
Ticker ticker) {
this(expireAfterWriteSec, refreshAfterWriteSec, OptionalLong.of(maxSize), enableStats, ticker,
OptionalLong.empty(), null);
}

public CacheFactory(
OptionalLong expireAfterWriteSec,
OptionalLong refreshAfterWriteSec,
boolean enableStats,
Ticker ticker,
long maxWeight,
Weigher<K, V> weigher) {
this(expireAfterWriteSec, refreshAfterWriteSec, OptionalLong.empty(), enableStats, ticker,
OptionalLong.of(maxWeight), weigher);
}

private CacheFactory(
OptionalLong expireAfterWriteSec,
OptionalLong refreshAfterWriteSec,
OptionalLong maxSize,
boolean enableStats,
Ticker ticker,
OptionalLong maxWeight,
Weigher<K, V> weigher) {
this.expireAfterWriteSec = expireAfterWriteSec;
this.refreshAfterWriteSec = refreshAfterWriteSec;
this.maxSize = maxSize;
this.enableStats = enableStats;
this.ticker = ticker;
this.maxWeight = maxWeight;
this.weigher = weigher;
}

// Build a loading cache, without executor, it will use fork-join pool for refresh
Expand All @@ -85,6 +116,11 @@ public <K, V> LoadingCache<K, V> buildCache(CacheLoader<K, V> cacheLoader,
return builder.build(cacheLoader);
}

public <K, V> Cache<K, V> buildCache() {
Caffeine<Object, Object> builder = buildWithParams();
return builder.build();
}

// Build an async loading cache
public <K, V> AsyncLoadingCache<K, V> buildAsyncCache(AsyncCacheLoader<K, V> cacheLoader,
ExecutorService executor) {
Expand All @@ -94,9 +130,11 @@ public <K, V> AsyncLoadingCache<K, V> buildAsyncCache(AsyncCacheLoader<K, V> cac
}

@NotNull
private Caffeine<Object, Object> buildWithParams() {
private <K, V> Caffeine<Object, Object> buildWithParams() {
Caffeine<Object, Object> builder = Caffeine.newBuilder();
builder.maximumSize(maxSize);
if (maxSize.isPresent()) {
builder.maximumSize(maxSize.getAsLong());
}

if (expireAfterWriteSec.isPresent()) {
builder.expireAfterWrite(Duration.ofSeconds(expireAfterWriteSec.getAsLong()));
Expand All @@ -112,6 +150,14 @@ private Caffeine<Object, Object> buildWithParams() {
if (ticker != null) {
builder.ticker(ticker);
}

if (maxWeight.isPresent()) {
builder.maximumWeight(maxWeight.getAsLong());
}

if (weigher != null) {
builder.weigher(weigher);
}
return builder;
}
}
247 changes: 247 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/EmptyCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This file is copied from
// https://github.com/trinodb/trino/blob/438/lib/trino-cache/src/main/java/io/trino/cache/EmptyCache.java
// and modified by Doris

package org.apache.doris.common;

import com.google.common.cache.AbstractLoadingCache;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheLoader.InvalidCacheLoadException;
import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.UncheckedExecutionException;

import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nullable;

class EmptyCache<K, V>
extends AbstractLoadingCache<K, V> {
private final CacheLoader<? super K, V> loader;
private final StatsCounter statsCounter;

EmptyCache(CacheLoader<? super K, V> loader, boolean recordStats) {
this.loader = Objects.requireNonNull(loader, "loader is null");
this.statsCounter = recordStats ? new SimpleStatsCounter() : new NoopStatsCounter();
}

@Override
public V getIfPresent(Object key) {
statsCounter.recordMisses(1);
return null;
}

@Override
public V get(K key)
throws ExecutionException {
return get(key, () -> loader.load(key));
}

@Override
public ImmutableMap<K, V> getAll(Iterable<? extends K> keys)
throws ExecutionException {
try {
Set<K> keySet = ImmutableSet.copyOf(keys);
statsCounter.recordMisses(keySet.size());
@SuppressWarnings("unchecked") // safe since all keys extend K
ImmutableMap<K, V> result = (ImmutableMap<K, V>) loader.loadAll(keySet);
for (K key : keySet) {
if (!result.containsKey(key)) {
throw new InvalidCacheLoadException("loadAll failed to return a value for " + key);
}
}
statsCounter.recordLoadSuccess(1);
return result;
} catch (RuntimeException e) {
statsCounter.recordLoadException(1);
throw new UncheckedExecutionException(e);
} catch (Exception e) {
statsCounter.recordLoadException(1);
throw new ExecutionException(e);
}
}

@Override
public V get(K key, Callable<? extends V> valueLoader)
throws ExecutionException {
statsCounter.recordMisses(1);
try {
V value = valueLoader.call();
statsCounter.recordLoadSuccess(1);
return value;
} catch (RuntimeException e) {
statsCounter.recordLoadException(1);
throw new UncheckedExecutionException(e);
} catch (Exception e) {
statsCounter.recordLoadException(1);
throw new ExecutionException(e);
}
}

@Override
public void put(K key, V value) {
// Cache, even if configured to evict everything immediately, should allow writes.
}

@Override
public void refresh(K key) {}

@Override
public void invalidate(Object key) {}

@Override
public void invalidateAll(Iterable<?> keys) {}

@Override
public void invalidateAll() {

}

@Override
public long size() {
return 0;
}

@Override
public CacheStats stats() {
return statsCounter.snapshot();
}

@Override
public ConcurrentMap<K, V> asMap() {
return new ConcurrentMap<K, V>() {
@Override
public V putIfAbsent(K key, V value) {
// Cache, even if configured to evict everything immediately, should allow writes.
// putIfAbsent returns the previous value
return null;
}

@Override
public boolean remove(Object key, Object value) {
return false;
}

@Override
public boolean replace(K key, V oldValue, V newValue) {
return false;
}

@Override
public V replace(K key, V value) {
return null;
}

@Override
public int size() {
return 0;
}

@Override
public boolean isEmpty() {
return true;
}

@Override
public boolean containsKey(Object key) {
return false;
}

@Override
public boolean containsValue(Object value) {
return false;
}

@Override
@Nullable
public V get(Object key) {
return null;
}

@Override
@Nullable
public V put(K key, V value) {
// Cache, even if configured to evict everything immediately, should allow writes.
return null;
}

@Override
@Nullable
public V remove(Object key) {
return null;
}

@Override
public void putAll(Map<? extends K, ? extends V> m) {
// Cache, even if configured to evict everything immediately, should allow writes.
}

@Override
public void clear() {

}

@Override
public Set<K> keySet() {
return ImmutableSet.of();
}

@Override
public Collection<V> values() {
return ImmutableSet.of();
}

@Override
public Set<Entry<K, V>> entrySet() {
return ImmutableSet.of();
}
};
}

private static class NoopStatsCounter
implements StatsCounter {
private static final CacheStats EMPTY_STATS = new SimpleStatsCounter().snapshot();

@Override
public void recordHits(int count) {}

@Override
public void recordMisses(int count) {}

@Override
public void recordLoadSuccess(long loadTime) {}

@Override
public void recordLoadException(long loadTime) {}

@Override
public void recordEviction() {}

@Override
public CacheStats snapshot() {
return EMPTY_STATS;
}
}
}
Loading

0 comments on commit 5fd4a75

Please sign in to comment.