Skip to content

Commit

Permalink
cache in fe
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Apr 17, 2024
1 parent 1b10a6f commit 4a31ed6
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.cache.CacheAnalyzer;
import org.apache.doris.qe.cache.SqlCache;

Expand Down Expand Up @@ -96,8 +97,8 @@ public static synchronized void updateConfig() {
}

private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCacheNum = sqlCacheNum <= 0 ? 100 : sqlCacheNum;
cacheIntervalSeconds = cacheIntervalSeconds <= 0 ? 30 : cacheIntervalSeconds;
sqlCacheNum = sqlCacheNum < 0 ? 100 : sqlCacheNum;
cacheIntervalSeconds = cacheIntervalSeconds < 0 ? 30 : cacheIntervalSeconds;

return Caffeine.newBuilder()
.maximumSize(sqlCacheNum)
Expand All @@ -107,6 +108,22 @@ private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, lo
.build();
}

/** tryAddFeCache */
public void tryAddFeSqlCache(ConnectContext connectContext, String sql) {
Optional<SqlCacheContext> sqlCacheContextOpt = connectContext.getStatementContext().getSqlCacheContext();
if (!sqlCacheContextOpt.isPresent()) {
return;
}

SqlCacheContext sqlCacheContext = sqlCacheContextOpt.get();
UserIdentity currentUserIdentity = connectContext.getCurrentUserIdentity();
String key = currentUserIdentity.toString() + ":" + sql.trim();
if ((sqlCaches.getIfPresent(key) == null) && sqlCacheContext.getOrComputeCacheKeyMd5() != null
&& sqlCacheContext.getResultSetInFe().isPresent()) {
sqlCaches.put(key, sqlCacheContext);
}
}

/** tryAddCache */
public void tryAddCache(
ConnectContext connectContext, String sql,
Expand Down Expand Up @@ -178,6 +195,19 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
}

try {
Optional<ResultSet> resultSetInFe = sqlCacheContext.getResultSetInFe();
if (resultSetInFe.isPresent()) {
MetricRepo.COUNTER_CACHE_HIT_SQL.increase(1L);

String cachedPlan = sqlCacheContext.getPhysicalPlan();
LogicalSqlCache logicalSqlCache = new LogicalSqlCache(
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(),
sqlCacheContext.getResultExprs(), resultSetInFe, ImmutableList.of(),
"none", cachedPlan
);
return Optional.of(logicalSqlCache);
}

Status status = new Status();
PUniqueId cacheKeyMd5 = sqlCacheContext.getOrComputeCacheKeyMd5();
InternalService.PFetchCacheResult cacheData =
Expand All @@ -195,7 +225,9 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri

LogicalSqlCache logicalSqlCache = new LogicalSqlCache(
sqlCacheContext.getQueryId(), sqlCacheContext.getColLabels(),
sqlCacheContext.getResultExprs(), cacheValues, backendAddress, cachedPlan);
sqlCacheContext.getResultExprs(), Optional.empty(),
cacheValues, backendAddress, cachedPlan
);
return Optional.of(logicalSqlCache);
}
return invalidateCache(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.analysis.LiteralExpr;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.common.NereidsException;
import org.apache.doris.common.Pair;
Expand Down Expand Up @@ -54,6 +55,7 @@
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSqlCache;
import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink;
Expand All @@ -66,8 +68,10 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.ResultSetMetaData;
import org.apache.doris.qe.cache.CacheAnalyzer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -168,8 +172,9 @@ public Plan plan(LogicalPlan plan, PhysicalProperties requireProperties,
LogicalSqlCache logicalSqlCache = (LogicalSqlCache) plan;
physicalPlan = new PhysicalSqlCache(
logicalSqlCache.getQueryId(), logicalSqlCache.getColumnLabels(),
logicalSqlCache.getResultExprs(), logicalSqlCache.getCacheValues(),
logicalSqlCache.getBackendAddress(), logicalSqlCache.getPlanBody()
logicalSqlCache.getResultExprs(), logicalSqlCache.getResultSetInFe(),
logicalSqlCache.getCacheValues(), logicalSqlCache.getBackendAddress(),
logicalSqlCache.getPlanBody()
);
return physicalPlan;
}
Expand Down Expand Up @@ -528,31 +533,66 @@ public Optional<ResultSet> handleQueryInFe(StatementBase parsedStmt) {
if (!(parsedStmt instanceof LogicalPlanAdapter)) {
return Optional.empty();
}
if (!(physicalPlan instanceof PhysicalResultSink)) {
return Optional.empty();
if (physicalPlan instanceof PhysicalSqlCache
&& ((PhysicalSqlCache) physicalPlan).getResultSet().isPresent()) {
return Optional.of(((PhysicalSqlCache) physicalPlan).getResultSet().get());
}
if (!(((PhysicalResultSink<?>) physicalPlan).child() instanceof PhysicalOneRowRelation)) {
if (!(physicalPlan instanceof PhysicalResultSink)) {
return Optional.empty();
}
PhysicalOneRowRelation physicalOneRowRelation
= (PhysicalOneRowRelation) ((PhysicalResultSink<?>) physicalPlan).child();
List<Column> columns = Lists.newArrayList();
List<String> data = Lists.newArrayList();
for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) {
NamedExpression item = physicalOneRowRelation.getProjects().get(i);
NamedExpression output = physicalPlan.getOutput().get(i);
Expression expr = item.child(0);
if (expr instanceof Literal) {
LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral();

Optional<SqlCacheContext> sqlCacheContext = statementContext.getSqlCacheContext();
boolean enableSqlCache
= CacheAnalyzer.canUseSqlCache(statementContext.getConnectContext().getSessionVariable());
Plan child = physicalPlan.child(0);
if (child instanceof PhysicalOneRowRelation) {
PhysicalOneRowRelation physicalOneRowRelation = (PhysicalOneRowRelation) physicalPlan.child(0);
List<Column> columns = Lists.newArrayList();
List<String> data = Lists.newArrayList();
for (int i = 0; i < physicalOneRowRelation.getProjects().size(); i++) {
NamedExpression item = physicalOneRowRelation.getProjects().get(i);
NamedExpression output = physicalPlan.getOutput().get(i);
Expression expr = item.child(0);
if (expr instanceof Literal) {
LiteralExpr legacyExpr = ((Literal) expr).toLegacyLiteral();
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
data.add(legacyExpr.getStringValueInFe());
} else {
return Optional.empty();
}
}

ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
if (sqlCacheContext.isPresent() && enableSqlCache) {
sqlCacheContext.get().setResultSetInFe(resultSet);
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
statementContext.getConnectContext(),
statementContext.getOriginStatement().originStmt
);
}
return Optional.of(resultSet);
} else if (child instanceof PhysicalEmptyRelation) {
List<Column> columns = Lists.newArrayList();
PhysicalEmptyRelation physicalEmptyRelation = (PhysicalEmptyRelation) physicalPlan.child(0);
for (int i = 0; i < physicalEmptyRelation.getProjects().size(); i++) {
NamedExpression output = physicalPlan.getOutput().get(i);
columns.add(new Column(output.getName(), output.getDataType().toCatalogDataType()));
data.add(legacyExpr.getStringValueInFe());
} else {
return Optional.empty();
}

ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, ImmutableList.of());
if (sqlCacheContext.isPresent() && enableSqlCache) {
sqlCacheContext.get().setResultSetInFe(resultSet);
Env.getCurrentEnv().getSqlCacheManager().tryAddFeSqlCache(
statementContext.getConnectContext(),
statementContext.getOriginStatement().originStmt
);
}
return Optional.of(resultSet);
} else {
return Optional.empty();
}
ResultSetMetaData metadata = new CommonResultSet.CommonResultSetMetaData(columns);
ResultSet resultSet = new CommonResultSet(metadata, Collections.singletonList(data));
return Optional.of(resultSet);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.doris.nereids.trees.expressions.Variable;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.Types.PUniqueId;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.qe.cache.CacheProxy;
import org.apache.doris.thrift.TUniqueId;

Expand Down Expand Up @@ -83,6 +84,7 @@ public class SqlCacheContext {
private volatile List<String> colLabels;

private volatile PUniqueId cacheKeyMd5;
private volatile ResultSet resultSetInFe;

public SqlCacheContext(UserIdentity userIdentity, TUniqueId queryId) {
this.userIdentity = Objects.requireNonNull(userIdentity, "userIdentity cannot be null");
Expand Down Expand Up @@ -378,6 +380,14 @@ public void setOriginSql(String originSql) {
this.originSql = originSql.trim();
}

public Optional<ResultSet> getResultSetInFe() {
return Optional.ofNullable(resultSetInFe);
}

public void setResultSetInFe(ResultSet resultSetInFe) {
this.resultSetInFe = resultSetInFe;
}

/** FullTableName */
@lombok.Data
@lombok.AllArgsConstructor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
Expand All @@ -46,18 +47,21 @@ public class LogicalSqlCache extends LogicalLeaf implements SqlCache, TreeString
private final TUniqueId queryId;
private final List<String> columnLabels;
private final List<Expr> resultExprs;
private final Optional<ResultSet> resultSetInFe;
private final List<InternalService.PCacheValue> cacheValues;
private final String backendAddress;
private final String planBody;

/** LogicalSqlCache */
public LogicalSqlCache(TUniqueId queryId,
List<String> columnLabels, List<Expr> resultExprs,
List<InternalService.PCacheValue> cacheValues, String backendAddress, String planBody) {
Optional<ResultSet> resultSetInFe, List<InternalService.PCacheValue> cacheValues,
String backendAddress, String planBody) {
super(PlanType.LOGICAL_SQL_CACHE, Optional.empty(), Optional.empty());
this.queryId = Objects.requireNonNull(queryId, "queryId can not be null");
this.columnLabels = Objects.requireNonNull(columnLabels, "columnLabels can not be null");
this.resultExprs = Objects.requireNonNull(resultExprs, "resultExprs can not be null");
this.resultSetInFe = Objects.requireNonNull(resultSetInFe, "resultSetInFe can not be null");
this.cacheValues = Objects.requireNonNull(cacheValues, "cacheValues can not be null");
this.backendAddress = Objects.requireNonNull(backendAddress, "backendAddress can not be null");
this.planBody = Objects.requireNonNull(planBody, "planBody can not be null");
Expand All @@ -67,6 +71,10 @@ public TUniqueId getQueryId() {
return queryId;
}

public Optional<ResultSet> getResultSetInFe() {
return resultSetInFe;
}

public List<InternalService.PCacheValue> getCacheValues() {
return cacheValues;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.Utils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ResultSet;
import org.apache.doris.statistics.Statistics;
import org.apache.doris.thrift.TUniqueId;

Expand All @@ -46,19 +47,22 @@ public class PhysicalSqlCache extends PhysicalLeaf implements SqlCache, TreeStri
private final TUniqueId queryId;
private final List<String> columnLabels;
private final List<Expr> resultExprs;
private final Optional<ResultSet> resultSet;
private final List<InternalService.PCacheValue> cacheValues;
private final String backendAddress;
private final String planBody;

/** PhysicalSqlCache */
public PhysicalSqlCache(TUniqueId queryId,
List<String> columnLabels, List<Expr> resultExprs,
List<InternalService.PCacheValue> cacheValues, String backendAddress, String planBody) {
Optional<ResultSet> resultSet, List<InternalService.PCacheValue> cacheValues,
String backendAddress, String planBody) {
super(PlanType.PHYSICAL_SQL_CACHE, Optional.empty(),
new LogicalProperties(() -> ImmutableList.of(), () -> FunctionalDependencies.EMPTY_FUNC_DEPS));
this.queryId = Objects.requireNonNull(queryId, "queryId can not be null");
this.columnLabels = Objects.requireNonNull(columnLabels, "colNames can not be null");
this.resultExprs = Objects.requireNonNull(resultExprs, "resultExprs can not be null");
this.resultSet = Objects.requireNonNull(resultSet, "resultSet can not be null");
this.cacheValues = Objects.requireNonNull(cacheValues, "cacheValues can not be null");
this.backendAddress = Objects.requireNonNull(backendAddress, "backendAddress can not be null");
this.planBody = Objects.requireNonNull(planBody, "planBody can not be null");
Expand All @@ -68,6 +72,10 @@ public TUniqueId getQueryId() {
return queryId;
}

public Optional<ResultSet> getResultSet() {
return resultSet;
}

public List<InternalService.PCacheValue> getCacheValues() {
return cacheValues;
}
Expand All @@ -92,7 +100,8 @@ public String getPlanBody() {
public String toString() {
return Utils.toSqlString("PhysicalSqlCache[" + id.asInt() + "]",
"queryId", DebugUtil.printId(queryId),
"backend", backendAddress
"backend", backendAddress,
"rowCount", resultSet.map(rs -> rs.getResultRows().size()).orElse(cacheValues.size())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1661,8 +1661,7 @@ private boolean sendCachedValues(MysqlChannel channel, List<InternalService.PCac
/**
* Handle the SelectStmt via Cache.
*/
private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel)
throws Exception {
private void handleCacheStmt(CacheAnalyzer cacheAnalyzer, MysqlChannel channel) throws Exception {
InternalService.PFetchCacheResult cacheResult = null;
boolean wantToParseSqlForSqlCache = planner instanceof NereidsPlanner
&& CacheAnalyzer.canUseSqlCache(context.getSessionVariable());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,41 @@ suite("parse_sql_from_sql_cache") {
def result3 = sql "select * from test_use_plan_cache19 order by 1, 2"
assertTrue(result3.size() == 1)
assertNoCache "select * from test_use_plan_cache19 order by 1, 2"
}),
extraThread("test_sql_cache_in_fe", {
createTestTable "test_use_plan_cache20"

sql "alter table test_use_plan_cache20 add partition p6 values[('999'), ('1000'))"

// after partition changed 10s, the sql cache can be used
sleep(10000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
sql "set enable_sql_cache=true"

assertNoCache "select * from (select 100 as id)a"
def result1 = sql "select * from (select 100 as id)a"
assertTrue(result1.size() == 1)

assertHasCache "select * from (select 100 as id)a"
def result2 = sql "select * from (select 100 as id)a"
assertTrue(result2.size() == 1)

assertNoCache "select * from test_use_plan_cache20 limit 0"
def result3 = sql "select * from test_use_plan_cache20 limit 0"
assertTrue(result3.isEmpty())

assertHasCache "select * from test_use_plan_cache20 limit 0"
def result4 = sql "select * from test_use_plan_cache20 limit 0"
assertTrue(result4.isEmpty())

assertNoCache "select * from test_use_plan_cache20 where id=999"
def result5 = sql "select * from test_use_plan_cache20 where id=999"
assertTrue(result5.isEmpty())
assertHasCache "select * from test_use_plan_cache20 where id=999"
def result6 = sql "select * from test_use_plan_cache20 where id=999"
assertTrue(result6.isEmpty())
})
).get()
}

0 comments on commit 4a31ed6

Please sign in to comment.