From 42f4271e9dfb5c5d912e0d9335167271cdd831b5 Mon Sep 17 00:00:00 2001 From: 924060929 <924060929@qq.com> Date: Tue, 2 Jul 2024 19:25:22 +0800 Subject: [PATCH] [enhancement](nereids) speedup sql cache with variable (#37090) (#37119) cherry pick from #37090 --- .../doris/common/NereidsSqlCacheManager.java | 49 +++++++---- .../apache/doris/nereids/NereidsPlanner.java | 79 ++--------------- .../apache/doris/nereids/SqlCacheContext.java | 88 ++++++++++--------- .../nereids/trees/plans/ComputeResultSet.java | 55 ++++++++++++ .../plans/physical/PhysicalEmptyRelation.java | 39 +++++++- .../physical/PhysicalOneRowRelation.java | 49 ++++++++++- .../plans/physical/PhysicalResultSink.java | 18 +++- .../plans/physical/PhysicalSqlCache.java | 11 ++- .../org/apache/doris/qe/StmtExecutor.java | 8 +- .../cache/parse_sql_from_sql_cache.groovy | 30 ++++--- 10 files changed, 276 insertions(+), 150 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/ComputeResultSet.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java index cf6280650f06f6..cbc3c173af6a0b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/NereidsSqlCacheManager.java @@ -48,6 +48,7 @@ import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.proto.InternalService; import org.apache.doris.proto.Types.PUniqueId; import org.apache.doris.qe.ConnectContext; @@ -58,6 +59,7 @@ import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import org.apache.commons.collections.CollectionUtils; import java.lang.reflect.Field; @@ -123,16 +125,14 @@ public void tryAddFeSqlCache(ConnectContext connectContext, String sql) { SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); String key = currentUserIdentity.toString() + ":" + sql.trim(); - if ((sqlCaches.getIfPresent(key) == null) && sqlCacheContext.getOrComputeCacheKeyMd5() != null + if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null && sqlCacheContext.getResultSetInFe().isPresent()) { sqlCaches.put(key, sqlCacheContext); } } - /** tryAddCache */ - public void tryAddCache( - ConnectContext connectContext, String sql, - CacheAnalyzer analyzer, boolean currentMissParseSqlFromSqlCache) { + /** tryAddBeCache */ + public void tryAddBeCache(ConnectContext connectContext, String sql, CacheAnalyzer analyzer) { Optional sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext(); if (!sqlCacheContextOpt.isPresent()) { return; @@ -143,8 +143,7 @@ public void tryAddCache( SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get(); UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity(); String key = currentUserIdentity.toString() + ":" + sql.trim(); - if ((currentMissParseSqlFromSqlCache || sqlCaches.getIfPresent(key) == null) - && sqlCacheContext.getOrComputeCacheKeyMd5() != null) { + if (sqlCaches.getIfPresent(key) == null && sqlCacheContext.getOrComputeCacheKeyMd5() != null) { SqlCache cache = (SqlCache) analyzer.getCache(); sqlCacheContext.setSumOfPartitionNum(cache.getSumOfPartitionNum()); sqlCacheContext.setLatestPartitionId(cache.getLatestId()); @@ -182,9 +181,6 @@ public Optional tryParseSql(ConnectContext connectContext, Stri if (viewsChanged(env, sqlCacheContext)) { return invalidateCache(key); } - if (usedVariablesChanged(sqlCacheContext)) { - return invalidateCache(key); - } LogicalEmptyRelation whateverPlan = new LogicalEmptyRelation(new RelationId(0), ImmutableList.of()); if (nondeterministicFunctionChanged(whateverPlan, connectContext, sqlCacheContext)) { @@ -201,7 +197,10 @@ public Optional tryParseSql(ConnectContext connectContext, Stri try { Optional resultSetInFe = sqlCacheContext.getResultSetInFe(); - if (resultSetInFe.isPresent()) { + + List currentVariables = resolveUserVariables(sqlCacheContext); + boolean usedVariablesChanged = usedVariablesChanged(currentVariables, sqlCacheContext); + if (resultSetInFe.isPresent() && !usedVariablesChanged) { MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L); String cachedPlan = sqlCacheContext.getPhysicalPlan(); @@ -214,7 +213,9 @@ public Optional tryParseSql(ConnectContext connectContext, Stri } Status status = new Status(); - PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5(); + PUniqueId cacheKeyMd5 = usedVariablesChanged + ? sqlCacheContext.doComputeCacheKeyMd5(Utils.fastToImmutableSet(currentVariables)) + : sqlCacheContext.getOrComputeCacheKeyMd5(); InternalService.PFetchCacheResult cacheData = SqlCache.getCacheData(sqlCacheContext.getCacheProxy(), cacheKeyMd5, sqlCacheContext.getLatestPartitionId(), @@ -235,7 +236,7 @@ public Optional tryParseSql(ConnectContext connectContext, Stri ); return Optional.of(logicalSqlCache); } - return invalidateCache(key); + return Optional.empty(); } catch (Throwable t) { return invalidateCache(key); } @@ -342,12 +343,24 @@ private boolean privilegeChanged(ConnectContext connectContext, Env env, SqlCach return false; } - private boolean usedVariablesChanged(SqlCacheContext sqlCacheContext) { - for (Variable variable : sqlCacheContext.getUsedVariables()) { + private List resolveUserVariables(SqlCacheContext sqlCacheContext) { + List cachedUsedVariables = sqlCacheContext.getUsedVariables(); + List currentVariables = Lists.newArrayListWithCapacity(cachedUsedVariables.size()); + for (Variable cachedVariable : cachedUsedVariables) { Variable currentVariable = ExpressionAnalyzer.resolveUnboundVariable( - new UnboundVariable(variable.getName(), variable.getType())); - if (!Objects.equals(currentVariable, variable) - || variable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) { + new UnboundVariable(cachedVariable.getName(), cachedVariable.getType())); + currentVariables.add(currentVariable); + } + return currentVariables; + } + + private boolean usedVariablesChanged(List currentVariables, SqlCacheContext sqlCacheContext) { + List cachedUsedVariables = sqlCacheContext.getUsedVariables(); + for (int i = 0; i < cachedUsedVariables.size(); i++) { + Variable currentVariable = currentVariables.get(i); + Variable cachedVariable = cachedUsedVariables.get(i); + if (!Objects.equals(currentVariable, cachedVariable) + || cachedVariable.getRealExpression().anyMatch(Nondeterministic.class::isInstance)) { return true; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 1ae1864ad3bf68..667784ad1192ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -19,10 +19,7 @@ import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.ExplainOptions; -import org.apache.doris.analysis.LiteralExpr; import org.apache.doris.analysis.StatementBase; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Env; import org.apache.doris.common.NereidsException; import org.apache.doris.common.Pair; import org.apache.doris.common.profile.SummaryProfile; @@ -45,37 +42,28 @@ import org.apache.doris.nereids.processor.pre.PlanPreprocessors; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.exploration.mv.MaterializationContext; -import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; -import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache; -import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation; -import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; -import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.Planner; import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.ScanNode; -import org.apache.doris.qe.CommonResultSet; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.ResultSet; -import org.apache.doris.qe.ResultSetMetaData; -import org.apache.doris.qe.cache.CacheAnalyzer; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.function.Function; @@ -535,65 +523,16 @@ public Optional handleQueryInFe(StatementBase parsedStmt) { if (!(parsedStmt instanceof LogicalPlanAdapter)) { return Optional.empty(); } - if (physicalPlan instanceof PhysicalSqlCache - && ((PhysicalSqlCache) physicalPlan).getResultSet().isPresent()) { - return Optional.of(((PhysicalSqlCache) physicalPlan).getResultSet().get()); - } - if (!(physicalPlan instanceof PhysicalResultSink)) { - return Optional.empty(); - } - - Optional sqlCacheContext = statementContext.getSqlCacheContext(); - boolean enableSqlCache - = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable()); - Plan child = physicalPlan.child(0); - if (child instanceof PhysicalOneRowRelation) { - PhysicalOneRowRelation physicalOneRowRelation = (PhysicalOneRowRelation) physicalPlan.child(0); - List columns = Lists.newArrayList(); - List data = Lists.newArrayList(); - for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) { - NamedExpression item = physicalOneRowRelation.getProjects().get(i); - NamedExpression output = physicalPlan.getOutput().get(i); - Expression expr = item.child(0); - if (expr instanceof Literal) { - LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral(); - columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); - data.add(legacyExpr.getStringValueInFe()); - } else { - return Optional.empty(); - } - } - - ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); - ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data)); - if (sqlCacheContext.isPresent() && enableSqlCache) { - sqlCacheContext.get().setResultSetInFe(resultSet); - Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( - statementContext.getConnectContext(), - statementContext.getOriginStatement().originStmt - ); + if (physicalPlan instanceof ComputeResultSet) { + Optional sqlCacheContext = statementContext.getSqlCacheContext(); + Optional resultSet = ((ComputeResultSet) physicalPlan) + .computeResultInFe(cascadesContext, sqlCacheContext); + if (resultSet.isPresent()) { + return resultSet; } - return Optional.of(resultSet); - } else if (child instanceof PhysicalEmptyRelation) { - List columns = Lists.newArrayList(); - for (int i = 0; i < physicalPlan.getOutput().size(); i++) { - NamedExpression output = physicalPlan.getOutput().get(i); - columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); - } - - ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); - ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of()); - if (sqlCacheContext.isPresent() && enableSqlCache) { - sqlCacheContext.get().setResultSetInFe(resultSet); - Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( - statementContext.getConnectContext(), - statementContext.getOriginStatement().originStmt - ); - } - return Optional.of(resultSet); - } else { - return Optional.empty(); } + + return Optional.empty(); } @VisibleForTesting diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index f3fa61cecaad8b..a0c95a9113e1b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -329,53 +329,57 @@ public PUniqueId getOrComputeCacheKeyMd5() { if (cacheKeyMd5 != null) { return cacheKeyMd5; } - - StringBuilder cacheKey = new StringBuilder(originSql); - for (Entry entry : usedViews.entrySet()) { - cacheKey.append("|") - .append(entry.getKey()) - .append("=") - .append(entry.getValue()); - } - for (Variable usedVariable : usedVariables) { - cacheKey.append("|") - .append(usedVariable.getType().name()) - .append(":") - .append(usedVariable.getName()) - .append("=") - .append(usedVariable.getRealExpression().toSql()); - } - for (Pair pair : foldNondeterministicPairs) { - cacheKey.append("|") - .append(pair.key().toSql()) - .append("=") - .append(pair.value().toSql()); - } - for (Entry> entry : rowPolicies.entrySet()) { - List policy = entry.getValue(); - if (policy.isEmpty()) { - continue; - } - cacheKey.append("|") - .append(entry.getKey()) - .append("=") - .append(policy); - } - for (Entry> entry : dataMaskPolicies.entrySet()) { - if (!entry.getValue().isPresent()) { - continue; - } - cacheKey.append("|") - .append(entry.getKey()) - .append("=") - .append(entry.getValue().map(Object::toString).orElse("")); - } - cacheKeyMd5 = CacheProxy.getMd5(cacheKey.toString()); + cacheKeyMd5 = doComputeCacheKeyMd5(usedVariables); } } return cacheKeyMd5; } + /** doComputeCacheKeyMd5 */ + public synchronized PUniqueId doComputeCacheKeyMd5(Set usedVariables) { + StringBuilder cacheKey = new StringBuilder(originSql); + for (Entry entry : usedViews.entrySet()) { + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(entry.getValue()); + } + for (Variable usedVariable : usedVariables) { + cacheKey.append("|") + .append(usedVariable.getType().name()) + .append(":") + .append(usedVariable.getName()) + .append("=") + .append(usedVariable.getRealExpression().toSql()); + } + for (Pair pair : foldNondeterministicPairs) { + cacheKey.append("|") + .append(pair.key().toSql()) + .append("=") + .append(pair.value().toSql()); + } + for (Entry> entry : rowPolicies.entrySet()) { + List policy = entry.getValue(); + if (policy.isEmpty()) { + continue; + } + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(policy); + } + for (Entry> entry : dataMaskPolicies.entrySet()) { + if (!entry.getValue().isPresent()) { + continue; + } + cacheKey.append("|") + .append(entry.getKey()) + .append("=") + .append(entry.getValue().map(Object::toString).orElse("")); + } + return CacheProxy.getMd5(cacheKey.toString()); + } + public void setOriginSql(String originSql) { this.originSql = originSql.trim(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/ComputeResultSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/ComputeResultSet.java new file mode 100644 index 00000000000000..beee784ec9de91 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/ComputeResultSet.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans; + +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.qe.ResultSet; + +import java.util.Optional; + +/** + *

+ * This class is used to return result set in fe without send fragment to be. + * Some plans support this function, for example: + *

  • 1. the sql `select 100` will generate a plan, PhysicalOneRowRelation, and PhysicalOneRowRelation implement this + * interface, so fe can send the only row to client immediately. + *
  • + *
  • 2. the sql `select * from tbl limit 0` will generate PhysicalEmptyRelation, which means no any rows returned, + * the PhysicalEmptyRelation implement this interface. + *
  • + *

    + *

    + * If you want to cache the result set in fe, you can implement this interface and write this code: + *

    + *
    +  * StatementContext statementContext = cascadesContext.getStatementContext();
    +  * boolean enableSqlCache
    +  *         = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
    +  * if (sqlCacheContext.isPresent() && enableSqlCache) {
    +  *     sqlCacheContext.get().setResultSetInFe(resultSet);
    +  *     Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
    +  *             statementContext.getConnectContext(),
    +  *             statementContext.getOriginStatement().originStmt
    +  *     );
    +  * }
    +  * 
    + */ +public interface ComputeResultSet { + Optional computeResultInFe(CascadesContext cascadesContext, Optional sqlCacheContext); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java index 2a9c344bd4681f..e01c3ead327b65 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalEmptyRelation.java @@ -17,20 +17,31 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.algebra.EmptyRelation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.CommonResultSet; +import org.apache.doris.qe.ResultSet; +import org.apache.doris.qe.ResultSetMetaData; +import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import java.util.List; import java.util.Objects; @@ -41,7 +52,7 @@ * e.g. * select * from tbl limit 0 */ -public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRelation { +public class PhysicalEmptyRelation extends PhysicalRelation implements EmptyRelation, ComputeResultSet { private final List projects; @@ -102,4 +113,30 @@ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalPr return new PhysicalEmptyRelation(relationId, projects, Optional.empty(), logicalPropertiesSupplier.get(), physicalProperties, statistics); } + + @Override + public Optional computeResultInFe(CascadesContext cascadesContext, + Optional sqlCacheContext) { + List columns = Lists.newArrayList(); + List outputSlots = getOutput(); + for (int i = 0; i < outputSlots.size(); i++) { + NamedExpression output = outputSlots.get(i); + columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); + } + + StatementContext statementContext = cascadesContext.getStatementContext(); + boolean enableSqlCache + = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable()); + + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of()); + if (sqlCacheContext.isPresent() && enableSqlCache) { + sqlCacheContext.get().setResultSetInFe(resultSet); + Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( + statementContext.getConnectContext(), + statementContext.getOriginStatement().originStmt + ); + } + return Optional.of(resultSet); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java index e1ab09739edad7..2b1b91891cb31e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalOneRowRelation.java @@ -17,21 +17,35 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; +import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.CommonResultSet; +import org.apache.doris.qe.ResultSet; +import org.apache.doris.qe.ResultSetMetaData; +import org.apache.doris.qe.cache.CacheAnalyzer; import org.apache.doris.statistics.Statistics; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.Optional; @@ -40,7 +54,7 @@ * A physical relation that contains only one row consist of some constant expressions. * e.g. select 100, 'value' */ -public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRelation { +public class PhysicalOneRowRelation extends PhysicalRelation implements OneRowRelation, ComputeResultSet { private final List projects; @@ -119,4 +133,37 @@ public PhysicalOneRowRelation withPhysicalPropertiesAndStats(PhysicalProperties return new PhysicalOneRowRelation(relationId, projects, groupExpression, logicalPropertiesSupplier.get(), physicalProperties, statistics); } + + @Override + public Optional computeResultInFe( + CascadesContext cascadesContext, Optional sqlCacheContext) { + List columns = Lists.newArrayList(); + List data = Lists.newArrayList(); + for (int i = 0; i < projects.size(); i++) { + NamedExpression item = projects.get(i); + NamedExpression output = getOutput().get(i); + Expression expr = item.child(0); + if (expr instanceof Literal) { + LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral(); + columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType())); + data.add(legacyExpr.getStringValueInFe()); + } else { + return Optional.empty(); + } + } + + ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns); + ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data)); + StatementContext statementContext = cascadesContext.getStatementContext(); + boolean enableSqlCache + = CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable()); + if (sqlCacheContext.isPresent() && enableSqlCache) { + sqlCacheContext.get().setResultSetInFe(resultSet); + Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache( + statementContext.getConnectContext(), + statementContext.getOriginStatement().originStmt + ); + } + return Optional.of(resultSet); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java index eba99cdfb21a20..aceb1f13774912 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalResultSink.java @@ -17,16 +17,20 @@ package org.apache.doris.nereids.trees.plans.physical; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.Sink; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ResultSet; import org.apache.doris.statistics.Statistics; import com.google.common.base.Preconditions; @@ -39,7 +43,8 @@ /** * result sink */ -public class PhysicalResultSink extends PhysicalSink implements Sink { +public class PhysicalResultSink extends PhysicalSink + implements Sink, ComputeResultSet { public PhysicalResultSink(List outputExprs, Optional groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { @@ -125,4 +130,15 @@ public PhysicalResultSink resetLogicalProperties() { return new PhysicalResultSink<>(outputExprs, groupExpression, null, physicalProperties, statistics, child()); } + + @Override + public Optional computeResultInFe( + CascadesContext cascadesContext, Optional sqlCacheContext) { + CHILD_TYPE child = child(); + if (child instanceof ComputeResultSet) { + return ((ComputeResultSet) child).computeResultInFe(cascadesContext, sqlCacheContext); + } else { + return Optional.empty(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java index 824ca7e8924058..124f52f6080254 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalSqlCache.java @@ -19,12 +19,15 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.SqlCacheContext; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.FunctionalDependencies; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.ComputeResultSet; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.TreeStringPlan; @@ -44,7 +47,7 @@ import java.util.Optional; /** PhysicalSqlCache */ -public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStringPlan { +public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStringPlan, ComputeResultSet { private final TUniqueId queryId; private final List columnLabels; private final List resultExprs; @@ -154,4 +157,10 @@ public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalPr public String getChildrenTreeString() { return planBody; } + + @Override + public Optional computeResultInFe( + CascadesContext cascadesContext, Optional sqlCacheContext) { + return resultSet; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 1e63a750748aec..5b8082fd0ba7f2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1591,7 +1591,7 @@ private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel) String originStmt = parsedStmt.getOrigStmt().originStmt; NereidsSqlCacheManager sqlCacheManager = context.getEnv().getSqlCacheManager(); if (cacheResult != null) { - sqlCacheManager.tryAddCache(context, originStmt, cacheAnalyzer, false); + sqlCacheManager.tryAddBeCache(context, originStmt, cacheAnalyzer); } } } @@ -1809,11 +1809,7 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields, Cache cache = cacheAnalyzer.getCache(); if (cache instanceof SqlCache && !cache.isDisableCache() && planner instanceof NereidsPlanner) { String originStmt = parsedStmt.getOrigStmt().originStmt; - LogicalPlanAdapter logicalPlanAdapter = (LogicalPlanAdapter) queryStmt; - boolean currentMissParseSqlFromSqlCache = !(logicalPlanAdapter.getLogicalPlan() - instanceof org.apache.doris.nereids.trees.plans.algebra.SqlCache); - context.getEnv().getSqlCacheManager().tryAddCache( - context, originStmt, cacheAnalyzer, currentMissParseSqlFromSqlCache); + context.getEnv().getSqlCacheManager().tryAddBeCache(context, originStmt, cacheAnalyzer); } } if (!isSendFields) { diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index 085bfe8135967b..d95c3edc3446c4 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -485,14 +485,24 @@ suite("parse_sql_from_sql_cache") { sql "set enable_sql_cache=true" sql "set @custom_variable=10" - assertNoCache "select @custom_variable from test_use_plan_cache17" + assertNoCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" // create sql cache - sql "select @custom_variable from test_use_plan_cache17" + sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" // can use sql cache - assertHasCache "select @custom_variable from test_use_plan_cache17" + assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" sql "set @custom_variable=20" - assertNoCache "select @custom_variable from test_use_plan_cache17" + assertNoCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + + def result2 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + assertTrue(result2.size() == 1 && result2[0][0].toString().toInteger() == 20) + + // we can switch to origin value and reuse origin cache + sql "set @custom_variable=10" + assertHasCache "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" + assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10) } }), extraThread("test_udf", { @@ -634,12 +644,14 @@ suite("parse_sql_from_sql_cache") { sql "set enable_fallback_to_original_planner=false" sql "set enable_sql_cache=true" - assertNoCache "select * from (select 100 as id)a" - def result1 = sql "select * from (select 100 as id)a" + int randomInt = Math.random() * 2000000000 + + assertNoCache "select * from (select $randomInt as id)a" + def result1 = sql "select * from (select $randomInt as id)a" assertTrue(result1.size() == 1) - assertHasCache "select * from (select 100 as id)a" - def result2 = sql "select * from (select 100 as id)a" + assertHasCache "select * from (select $randomInt as id)a" + def result2 = sql "select * from (select $randomInt as id)a" assertTrue(result2.size() == 1) assertNoCache "select * from test_use_plan_cache20 limit 0" @@ -671,8 +683,6 @@ suite("parse_sql_from_sql_cache") { distributed by hash(id) properties('replication_num'='1')""" - - sql "insert into test_use_plan_cache21 values('2', '2')" sleep(100) sql "insert into test_use_plan_cache21 values('1', '1')"