Skip to content

Commit

Permalink
query cache in fe
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Aug 27, 2024
1 parent 99b3437 commit bcd32ca
Show file tree
Hide file tree
Showing 24 changed files with 1,913 additions and 15 deletions.
25 changes: 22 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.common.io.Text;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rewrite.mvrewrite.MVExprEquivalent;
import org.apache.doris.statistics.ExprStats;
Expand Down Expand Up @@ -996,8 +997,12 @@ public void setFn(Function fn) {
this.fn = fn;
}

// Append a flattened version of this expr, including all children, to 'container'.
protected void treeToThriftHelper(TExpr container) {
treeToThriftHelper(container, ((expr, exprNode) -> expr.toThrift(exprNode)));
}

// Append a flattened version of this expr, including all children, to 'container'.
protected void treeToThriftHelper(TExpr container, ExprVisitor visitor) {
TExprNode msg = new TExprNode();
msg.type = type.toThrift();
msg.num_children = children.size();
Expand All @@ -1009,13 +1014,17 @@ protected void treeToThriftHelper(TExpr container) {
}
msg.output_scale = getOutputScale();
msg.setIsNullable(nullableFromNereids.isPresent() ? nullableFromNereids.get() : isNullable());
toThrift(msg);
visitor.visit(this, msg);
container.addToNodes(msg);
for (Expr child : children) {
child.treeToThriftHelper(container);
child.treeToThriftHelper(container, visitor);
}
}

public interface ExprVisitor {
void visit(Expr expr, TExprNode exprNode);
}

public static Type getAssignmentCompatibleType(List<Expr> children) {
Type assignmentCompatibleType = Type.INVALID;
for (int i = 0; i < children.size()
Expand Down Expand Up @@ -2193,6 +2202,16 @@ public String getStringValueForArray(FormatOptions options) {
return null;
}

public final TExpr normalize(Normalizer normalizer) {
TExpr result = new TExpr();
treeToThriftHelper(result, (expr, texprNode) -> expr.normalize(texprNode, normalizer));
return result;
}

protected void normalize(TExprNode msg, Normalizer normalizer) {
this.toThrift(msg);
}

public static Expr getFirstBoundChild(Expr expr, List<TupleId> tids) {
for (Expr child : expr.getChildren()) {
if (child.isBoundByTupleIds(tids)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TExprNode;
Expand Down Expand Up @@ -2406,6 +2407,17 @@ public void readFields(DataInput in) throws IOException {
isMergeAggFn = in.readBoolean();
}

@Override
protected void normalize(TExprNode msg, Normalizer normalizer) {
String functionName = fnName.getFunction().toUpperCase();
if (FunctionSet.nonDeterministicFunctions.contains(functionName)
|| "NOW".equals(functionName)
|| (FunctionSet.nonDeterministicTimeFunctions.contains(functionName) && children.isEmpty())) {
throw new IllegalStateException("Can not normalize non deterministic functions");
}
super.normalize(msg, normalizer);
}

public static FunctionCallExpr read(DataInput in) throws IOException {
FunctionCallExpr func = new FunctionCallExpr();
func.readFields(in);
Expand Down
12 changes: 12 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/analysis/SlotRef.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.ToSqlContext;
import org.apache.doris.planner.normalize.Normalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TExprNode;
import org.apache.doris.thrift.TExprNodeType;
Expand Down Expand Up @@ -353,6 +354,17 @@ protected void toThrift(TExprNode msg) {
msg.setLabel(label);
}

@Override
protected void normalize(TExprNode msg, Normalizer normalizer) {
msg.node_type = TExprNodeType.SLOT_REF;
// we should eliminate the different tuple id to reuse query cache
msg.slot_ref = new TSlotRef(
normalizer.normalizeSlotId(desc.getId().asInt()),
0
);
msg.slot_ref.setColUniqueId(desc.getUniqueId());
}

@Override
public void markAgg() {
desc.setIsAgg(true);
Expand Down
26 changes: 26 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/FunctionSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,32 @@ public boolean isNullResultWithOneNullParamFunctions(String funcName) {
.put(Type.DECIMAL128, Type.DECIMAL128)
.build();

public static final Set<String> nonDeterministicFunctions =
ImmutableSet.<String>builder()
.add("RAND")
.add("RANDOM")
.add("RANDOM_BYTES")
.add("CONNECTION_ID")
.add("DATABASE")
.add("USER")
.add("UUID")
.add("CURRENT_USER")
.add("UUID_NUMERIC")
.build();

public static final Set<String> nonDeterministicTimeFunctions =
ImmutableSet.<String>builder()
.add("NOW")
.add("CURDATE")
.add("CURRENT_DATE")
.add("UTC_TIMESTAMP")
.add("CURTIME")
.add("CURRENT_TIMESTAMP")
.add("CURRENT_TIME")
.add("UNIX_TIMESTAMP")
.add()
.build();

private static final Map<Type, String> STDDEV_UPDATE_SYMBOL =
ImmutableMap.<Type, String>builder()
.put(Type.TINYINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Collectors;
import java.util.zip.CRC32;
Expand Down Expand Up @@ -304,6 +306,64 @@ public int compareTo(PartitionKey other) {
return Integer.compare(thisKeyLen, otherKeyLen);
}

public PartitionKey successor() throws AnalysisException {
Preconditions.checkState(
keys.size() == 1,
"Only support compute successor for one partition column"
);
LiteralExpr literal = keys.get(0);
PrimitiveType type = types.get(0);

PartitionKey successor = new PartitionKey();

switch (type) {
case TINYINT:
case SMALLINT:
case INT:
case BIGINT:
long maxValueOfType = (1L << ((type.getSlotSize() << 3 /* multiply 8 bit */) - 1)) - 1L;
long successorInt = ((IntLiteral) literal).getValue();
successorInt += successorInt < maxValueOfType ? 1 : 0;
successor.pushColumn(new IntLiteral(successorInt, Type.fromPrimitiveType(type)), type);
return successor;
case LARGEINT:
BigInteger maxValue = BigInteger.ONE.shiftLeft(127).subtract(BigInteger.ONE);
BigInteger successorLargeInt = (BigInteger) literal.getRealValue();
successorLargeInt = successorLargeInt.add(
successorLargeInt.compareTo(maxValue) < 0 ? BigInteger.ONE : BigInteger.ZERO
);
successor.pushColumn(new LargeIntLiteral(successorLargeInt), type);
return successor;
case DATE:
case DATEV2:
case DATETIME:
case DATETIMEV2:
DateLiteral dateLiteral = (DateLiteral) literal;
LocalDateTime successorDateTime = LocalDateTime.of(
(int) dateLiteral.getYear(),
(int) dateLiteral.getMonth(),
(int) dateLiteral.getDay(),
(int) dateLiteral.getHour(),
(int) dateLiteral.getMinute(),
(int) dateLiteral.getSecond(),
(int) dateLiteral.getMicrosecond() * 1000
);
if (type == PrimitiveType.DATE || type == PrimitiveType.DATEV2) {
successorDateTime = successorDateTime.plusDays(1);
} else if (type == PrimitiveType.DATETIME) {
successorDateTime = successorDateTime.plusSeconds(1);
} else {
int scale = Math.min(6, Math.max(0, ((ScalarType) literal.getType()).getScalarScale()));
long nanoSeconds = BigInteger.TEN.pow(9 - scale).longValue();
successorDateTime = successorDateTime.plusNanos(nanoSeconds);
}
successor.pushColumn(new DateLiteral(successorDateTime, literal.getType()), type);
return successor;
default:
throw new AnalysisException("Unsupported type: " + type);
}
}

// return: ("100", "200", "300")
public String toSql() {
StringBuilder sb = new StringBuilder("(");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.io.DataInput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -66,6 +67,21 @@ public RangePartitionInfo(boolean isAutoCreatePartitions, ArrayList<Expr> exprs,
}
}

public Map<Long, PartitionItem> getPartitionItems(Collection<Long> partitionIds) {
Map<Long, PartitionItem> columnRanges = Maps.newLinkedHashMapWithExpectedSize(partitionIds.size());
for (Long partitionId : partitionIds) {
PartitionItem partitionItem = idToItem.get(partitionId);
if (partitionItem == null) {
partitionItem = idToTempItem.get(partitionId);
}
if (partitionItem == null) {
throw new IllegalStateException("Can not found partition item: " + partitionId);
}
columnRanges.put(partitionId, partitionItem);
}
return columnRanges;
}

@Override
public PartitionItem createAndCheckPartitionItem(SinglePartitionDesc desc, boolean isTemp) throws DdlException {
Range<PartitionKey> newRange = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.profile.SummaryProfile;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.mysql.FieldInfo;
import org.apache.doris.nereids.CascadesContext.Lock;
import org.apache.doris.nereids.exceptions.AnalysisException;
Expand Down Expand Up @@ -65,12 +66,15 @@
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.RuntimeFilter;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.normalize.QueryCacheNormalizer;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TQueryCacheParam;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.codec.binary.Hex;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -343,10 +347,11 @@ private void splitFragments(PhysicalPlan resultPlan) {
if (statementContext.getConnectContext().getExecutor() != null) {
statementContext.getConnectContext().getExecutor().getSummaryProfile().setNereidsTranslateTime();
}
if (cascadesContext.getConnectContext().getSessionVariable().isEnableNereidsTrace()) {
SessionVariable sessionVariable = cascadesContext.getConnectContext().getSessionVariable();
if (sessionVariable.isEnableNereidsTrace()) {
CounterEvent.clearCounter();
}
if (cascadesContext.getConnectContext().getSessionVariable().isPlayNereidsDump()) {
if (sessionVariable.isPlayNereidsDump()) {
return;
}
PlanFragment root = physicalPlanTranslator.translatePlan(physicalPlan);
Expand All @@ -355,8 +360,28 @@ private void splitFragments(PhysicalPlan resultPlan) {
physicalRelations.addAll(planTranslatorContext.getPhysicalRelations());
descTable = planTranslatorContext.getDescTable();
fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());

boolean enableQueryCache = sessionVariable.getEnableQueryCache();
String queryId = DebugUtil.printId(cascadesContext.getConnectContext().queryId());
for (int seq = 0; seq < fragments.size(); seq++) {
fragments.get(seq).setFragmentSequenceNum(seq);
PlanFragment fragment = fragments.get(seq);
fragment.setFragmentSequenceNum(seq);
if (enableQueryCache) {
try {
QueryCacheNormalizer normalizer = new QueryCacheNormalizer(fragment, descTable);
Optional<TQueryCacheParam> queryCacheParam =
normalizer.normalize(cascadesContext.getConnectContext());
if (queryCacheParam.isPresent()) {
fragment.queryCacheParam = queryCacheParam.get();
LOG.info("Use query cache for fragment {}, node id: {}, digest: {}, queryId: {}",
seq,
fragment.queryCacheParam.node_id,
Hex.encodeHexString(fragment.queryCacheParam.digest), queryId);
}
} catch (Throwable t) {
// do nothing
}
}
}
// set output exprs
logicalPlanAdapter.setResultExprs(root.getOutputExprs());
Expand Down
65 changes: 65 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/nereids/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,71 @@ private static <T> List<List<T>> doAllCombinations(List<List<T>> lists) {
).collect(ImmutableList.toImmutableList());
}

/** getAllCombinations */
public static <T> List<List<T>> getAllCombinations(List<T> list, int itemNum) {
List<List<T>> result = Lists.newArrayList();
generateCombinations(list, itemNum, 0, Lists.newArrayList(), result);
return result;
}

private static <T> void generateCombinations(
List<T> list, int n, int start, List<T> current, List<List<T>> result) {
if (current.size() == n) {
result.add(new ArrayList<>(current));
return;
}

for (int i = start; i < list.size(); i++) {
current.add(list.get(i));
generateCombinations(list, n, i + 1, current, result);
current.remove(current.size() - 1);
}
}

public static <T> List<List<T>> allPermutations(List<T> list) {
List<List<T>> result = new ArrayList<>();
generatePermutations(new ArrayList<>(list), new ArrayList<>(), result);
return result;
}

private static <T> void generatePermutations(List<T> list, List<T> current, List<List<T>> result) {
if (!current.isEmpty()) {
result.add(new ArrayList<>(current));
}

for (int i = 0; i < list.size(); i++) {
T element = list.remove(i);
current.add(element);
generatePermutations(list, current, result);
current.remove(current.size() - 1);
list.add(i, element);
}
}

/** permutations */
public static <T> List<List<T>> permutations(List<T> list) {
list = new ArrayList<>(list);
List<List<T>> result = new ArrayList<>();
if (list.isEmpty()) {
result.add(new ArrayList<>());
return result;
}

T firstElement = list.get(0);
List<T> rest = list.subList(1, list.size());
List<List<T>> recursivePermutations = permutations(rest);

for (List<T> smallerPermutated : recursivePermutations) {
for (int index = 0; index <= smallerPermutated.size(); index++) {
List<T> temp = new ArrayList<>(smallerPermutated);
temp.add(index, firstElement);
result.add(temp);
}
}

return result;
}

public static <T> List<T> copyRequiredList(List<T> list) {
return ImmutableList.copyOf(Objects.requireNonNull(list, "non-null list is required"));
}
Expand Down
Loading

0 comments on commit bcd32ca

Please sign in to comment.