From f458b88dc3376af3938785671220529d8b725b2d Mon Sep 17 00:00:00 2001 From: 924060929 Date: Mon, 6 Jan 2025 16:09:24 +0800 Subject: [PATCH] fix --- .../common/cache/NereidsSqlCacheManager.java | 30 +- .../apache/doris/nereids/SqlCacheContext.java | 27 +- .../nereids/trees/plans/AbstractPlan.java | 4 +- .../doris/regression/suite/Suite.groovy | 21 +- .../cache/parse_sql_from_sql_cache.groovy | 1471 ++++++++--------- 5 files changed, 783 insertions(+), 770 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java index aba0decb76e3a0..989009818c7580 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/cache/NereidsSqlCacheManager.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.catalog.View; import org.apache.doris.common.Config; import org.apache.doris.common.ConfigBase.DefaultConfHandler; @@ -39,6 +40,7 @@ import org.apache.doris.nereids.SqlCacheContext.FullColumnName; import org.apache.doris.nereids.SqlCacheContext.FullTableName; import org.apache.doris.nereids.SqlCacheContext.ScanTable; +import org.apache.doris.nereids.SqlCacheContext.TableVersion; import org.apache.doris.nereids.StatementContext; import org.apache.doris.nereids.analyzer.UnboundVariable; import org.apache.doris.nereids.parser.NereidsParser; @@ -308,20 +310,36 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) { return true; } - for (ScanTable scanTable : sqlCacheContext.getScanTables()) { - FullTableName fullTableName = scanTable.fullTableName; - TableIf tableIf = findTableIf(env, fullTableName); - if (!(tableIf instanceof OlapTable)) { + // the query maybe scan empty partition of the table, we should check these table version too, + // but the table not exists in sqlCacheContext.getScanTables(), so we need check here + // check table type and version + for (Entry scanTable : sqlCacheContext.getUsedTables().entrySet()) { + TableVersion tableVersion = scanTable.getValue(); + if (tableVersion.type != TableType.OLAP) { return true; } + TableIf tableIf = findTableIf(env, scanTable.getKey()); + if (!(tableIf instanceof OlapTable) || tableVersion.id != tableIf.getId()) { + return true; + } + OlapTable olapTable = (OlapTable) tableIf; long currentTableVersion = olapTable.getVisibleVersion(); - long cacheTableVersion = scanTable.latestVersion; + long cacheTableVersion = tableVersion.version; // some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition? if (currentTableVersion != cacheTableVersion) { return true; } + } + // check partition version + for (ScanTable scanTable : sqlCacheContext.getScanTables()) { + FullTableName fullTableName = scanTable.fullTableName; + TableIf tableIf = findTableIf(env, fullTableName); + if (!(tableIf instanceof OlapTable)) { + return true; + } + OlapTable olapTable = (OlapTable) tableIf; Collection partitionIds = scanTable.getScanPartitions(); olapTable.getVersionInBatchForCloudMode(partitionIds); @@ -392,7 +410,7 @@ private boolean dataMaskPoliciesChanged( */ private boolean tryLockTables(ConnectContext connectContext, Env env, SqlCacheContext sqlCacheContext) { StatementContext currentStatementContext = connectContext.getStatementContext(); - for (FullTableName fullTableName : sqlCacheContext.getUsedTables()) { + for (FullTableName fullTableName : sqlCacheContext.getUsedTables().keySet()) { TableIf tableIf = findTableIf(env, fullTableName); if (tableIf == null) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java index 29be4af41a7675..309bf9a83b883e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/SqlCacheContext.java @@ -20,7 +20,9 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.DatabaseIf; +import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.Pair; import org.apache.doris.datasource.CatalogIf; import org.apache.doris.mysql.FieldInfo; @@ -41,6 +43,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import java.util.Collections; import java.util.List; import java.util.Locale; import java.util.Map; @@ -61,7 +64,8 @@ public class SqlCacheContext { private volatile long latestPartitionTime = -1; private volatile long latestPartitionVersion = -1; private volatile long sumOfPartitionNum = -1; - private final Set usedTables = Sets.newLinkedHashSet(); + // value: version of table + private final Map usedTables = Maps.newLinkedHashMap(); // value: ddl sql private final Map usedViews = Maps.newLinkedHashMap(); // value: usedColumns @@ -135,8 +139,13 @@ public synchronized void addUsedTable(TableIf tableIf) { return; } - usedTables.add( - new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()) + usedTables.put( + new FullTableName(database.getCatalog().getName(), database.getFullName(), tableIf.getName()), + new TableVersion( + tableIf.getId(), + tableIf instanceof OlapTable ? ((OlapTable) tableIf).getVisibleVersion() : 0L, + tableIf.getType() + ) ); } @@ -282,8 +291,8 @@ public void setCacheProxy(CacheProxy cacheProxy) { this.cacheProxy = cacheProxy; } - public Set getUsedTables() { - return ImmutableSet.copyOf(usedTables); + public Map getUsedTables() { + return Collections.unmodifiableMap(usedTables); } public Map getUsedViews() { @@ -458,6 +467,14 @@ public void addScanPartition(Long partitionId) { } } + @lombok.Data + @lombok.AllArgsConstructor + public static class TableVersion { + public final long id; + public final long version; + public final TableType type; + } + /** CacheKeyType */ public enum CacheKeyType { // use `userIdentity`:`sql`.trim() as Cache key in FE diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java index eb65048050fda1..958b4fe9c424b4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/AbstractPlan.java @@ -228,6 +228,8 @@ public List getAncestors() { } public void updateActualRowCount(long actualRowCount) { - statistics.setActualRowCount(actualRowCount); + if (statistics != null) { + statistics.setActualRowCount(actualRowCount); + } } } diff --git a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy index e6f2de0dd0d676..946af26a419007 100644 --- a/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy +++ b/regression-test/framework/src/main/groovy/org/apache/doris/regression/suite/Suite.groovy @@ -38,7 +38,6 @@ import org.apache.doris.regression.RegressionTest import org.apache.doris.regression.action.BenchmarkAction import org.apache.doris.regression.action.ProfileAction import org.apache.doris.regression.action.WaitForAction -import org.apache.doris.regression.util.DataUtils import org.apache.doris.regression.util.OutputUtils import org.apache.doris.regression.action.CreateMVAction import org.apache.doris.regression.action.ExplainAction @@ -60,13 +59,7 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import java.sql.Connection -import java.io.File -import java.math.BigDecimal; -import java.sql.PreparedStatement -import java.sql.ResultSetMetaData -import java.util.Map; import java.util.concurrent.Callable -import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.Future import java.util.concurrent.ThreadFactory @@ -1087,6 +1080,20 @@ class Suite implements GroovyInterceptable { } } + void foreachFrontends(Closure action) { + def rows = sql_return_maparray("show frontends") + for (def row in rows) { + action(row) + } + } + + void foreachBackends(Closure action) { + def rows = sql_return_maparray("show backends") + for (def row in rows) { + action(row) + } + } + List getFrontendIpHttpPort() { return sql_return_maparray("show frontends").collect { it.Host + ":" + it.HttpPort }; } diff --git a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy index f11730799508ca..84d0f129faefd3 100644 --- a/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy +++ b/regression-test/suites/nereids_p0/cache/parse_sql_from_sql_cache.groovy @@ -17,16 +17,11 @@ import java.util.stream.Collectors -suite("parse_sql_from_sql_cache") { +suite("parse_sql_from_sql_cache2") { def assertHasCache = { String sqlStr -> - retry(120, 1000) { i -> - if (i > 1) { - sql sqlStr - } - explain { - sql ("physical plan ${sqlStr}") - contains("PhysicalSqlCache") - } + explain { + sql ("physical plan ${sqlStr}") + contains("PhysicalSqlCache") } } @@ -37,523 +32,508 @@ suite("parse_sql_from_sql_cache") { } } - combineFutures( - extraThread("testUsePlanCache", { - createTestTable "test_use_plan_cache" - - // after partition changed 10s, the sql cache can be used + def db = (sql "select database()")[0][0].toString() + foreachFrontends { fe -> + def url = "jdbc:mysql://${fe.Host}:${fe.QueryPort}/${db}" + connect(context.config.jdbcUser, context.config.jdbcPassword, url) { sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + } + } - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // make sure if the table has been dropped, the cache should invalidate, + // so we should retry multiple times to check + for (def __ in 0..3) { + combineFutures( + extraThread("testUsePlanCache", { + createTestTable "test_use_plan_cache" - assertNoCache "select * from test_use_plan_cache" + // after partition changed 10s, the sql cache can be used + sleep(10000) - // create sql cache - sql "select * from test_use_plan_cache" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // use sql cache - assertHasCache "select * from test_use_plan_cache" - }), - extraThread("testAddPartitionAndInsert", { - createTestTable "test_use_plan_cache2" + assertNoCache "select * from test_use_plan_cache" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + // create sql cache + sql "select * from test_use_plan_cache" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // use sql cache + assertHasCache "select * from test_use_plan_cache" + }), + extraThread("testAddPartitionAndInsert", { + createTestTable "test_use_plan_cache2" - assertNoCache "select * from test_use_plan_cache2" - sql "select * from test_use_plan_cache2" - assertHasCache "select * from test_use_plan_cache2" + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // NOTE: in cloud mode, add empty partition can not use cache, because the table version already update, - // but in native mode, add empty partition can use cache - sql "alter table test_use_plan_cache2 add partition p6 values[('6'),('7'))" - if (isCloudMode()) { assertNoCache "select * from test_use_plan_cache2" - } else { + sql "select * from test_use_plan_cache2" assertHasCache "select * from test_use_plan_cache2" - } - // insert data can not use cache - sql "insert into test_use_plan_cache2 values(6, 1)" - assertNoCache "select * from test_use_plan_cache2" - }), - extraThread("testDropPartition", { - createTestTable "test_use_plan_cache3" - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + // NOTE: in cloud mode, add empty partition can not use cache, because the table version already update, + // but in native mode, add empty partition can use cache + sql "alter table test_use_plan_cache2 add partition p6 values[('6'),('7'))" + if (isCloudMode()) { + assertNoCache "select * from test_use_plan_cache2" + } else { + assertHasCache "select * from test_use_plan_cache2" + } - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // insert data can not use cache + sql "insert into test_use_plan_cache2 values(6, 1)" + assertNoCache "select * from test_use_plan_cache2" + }), + extraThread("testDropPartition", { + createTestTable "test_use_plan_cache3" - assertNoCache "select * from test_use_plan_cache3" - sql "select * from test_use_plan_cache3" - assertHasCache "select * from test_use_plan_cache3" + // after partition changed 10s, the sql cache can be used + sleep(10000) - // drop partition can not use cache - sql "alter table test_use_plan_cache3 drop partition p5" - assertNoCache "select * from test_use_plan_cache3" - }), - extraThread("testReplacePartition", { - createTestTable "test_use_plan_cache4" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - sql "alter table test_use_plan_cache4 add temporary partition tp1 values [('1'), ('2'))" + assertNoCache "select * from test_use_plan_cache3" + sql "select * from test_use_plan_cache3" + assertHasCache "select * from test_use_plan_cache3" - streamLoad { - table "test_use_plan_cache4" - set "temporaryPartitions", "tp1" - inputIterator([[1, 3], [1, 4]].iterator()) - } - sql "sync" + // drop partition can not use cache + sql "alter table test_use_plan_cache3 drop partition p5" + assertNoCache "select * from test_use_plan_cache3" + }), + extraThread("testReplacePartition", { + createTestTable "test_use_plan_cache4" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + sql "alter table test_use_plan_cache4 add temporary partition tp1 values [('1'), ('2'))" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + streamLoad { + table "test_use_plan_cache4" + set "temporaryPartitions", "tp1" + inputIterator([[1, 3], [1, 4]].iterator()) + } + sql "sync" - assertNoCache "select * from test_use_plan_cache4" - sql "select * from test_use_plan_cache4" - assertHasCache "select * from test_use_plan_cache4" + // after partition changed 10s, the sql cache can be used + sleep(10000) - // replace partition can not use cache - sql "alter table test_use_plan_cache4 replace partition (p1) with temporary partition(tp1)" - assertNoCache "select * from test_use_plan_cache4" - }), - extraThread("testStreamLoad", { - createTestTable "test_use_plan_cache5" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache5" - sql "select * from test_use_plan_cache5" - assertHasCache "select * from test_use_plan_cache5" - - streamLoad { - table "test_use_plan_cache5" - set "partitions", "p1" - inputIterator([[1, 3], [1, 4]].iterator()) - } - sql "sync" - - // stream load can not use cache - sql "select * from test_use_plan_cache5" - assertNoCache "select * from test_use_plan_cache5" - }), - extraThread("testUpdate",{ - createTestTable("test_use_plan_cache6", true) - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + assertNoCache "select * from test_use_plan_cache4" + sql "select * from test_use_plan_cache4" + assertHasCache "select * from test_use_plan_cache4" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // replace partition can not use cache + sql "alter table test_use_plan_cache4 replace partition (p1) with temporary partition(tp1)" + assertNoCache "select * from test_use_plan_cache4" + }), + extraThread("testStreamLoad", { + createTestTable "test_use_plan_cache5" - assertNoCache "select * from test_use_plan_cache6" - sql "select * from test_use_plan_cache6" - assertHasCache "select * from test_use_plan_cache6" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "update test_use_plan_cache6 set value=3 where id=1" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // update can not use cache - sql "select * from test_use_plan_cache6" - assertNoCache "select * from test_use_plan_cache6" - }), - extraThread("testDelete", { - createTestTable "test_use_plan_cache7" + assertNoCache "select * from test_use_plan_cache5" + sql "select * from test_use_plan_cache5" + assertHasCache "select * from test_use_plan_cache5" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + streamLoad { + table "test_use_plan_cache5" + set "partitions", "p1" + inputIterator([[1, 3], [1, 4]].iterator()) + } + sql "sync" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // stream load can not use cache + sql "select * from test_use_plan_cache5" + assertNoCache "select * from test_use_plan_cache5" + }), + extraThread("testUpdate",{ + createTestTable("test_use_plan_cache6", true) - assertNoCache "select * from test_use_plan_cache7" - sql "select * from test_use_plan_cache7" - assertHasCache "select * from test_use_plan_cache7" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "delete from test_use_plan_cache7 where id = 1" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // delete can not use cache - sql "select * from test_use_plan_cache7" - assertNoCache "select * from test_use_plan_cache7" - }), - extraThread("testDropTable", { - createTestTable "test_use_plan_cache8" + assertNoCache "select * from test_use_plan_cache6" + sql "select * from test_use_plan_cache6" + assertHasCache "select * from test_use_plan_cache6" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + sql "update test_use_plan_cache6 set value=3 where id=1" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // update can not use cache + sql "select * from test_use_plan_cache6" + assertNoCache "select * from test_use_plan_cache6" + }), + extraThread("testDelete", { + createTestTable "test_use_plan_cache7" - assertNoCache "select * from test_use_plan_cache8" - sql "select * from test_use_plan_cache8" - assertHasCache "select * from test_use_plan_cache8" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "drop table test_use_plan_cache8" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // should visible the table has bean deleted - test { - sql "select * from test_use_plan_cache8" - exception "does not exist in database" - } - }), - extraThread("testCreateAndAlterView", { - createTestTable "test_use_plan_cache9" + assertNoCache "select * from test_use_plan_cache7" + sql "select * from test_use_plan_cache7" + assertHasCache "select * from test_use_plan_cache7" - sql "drop view if exists test_use_plan_cache9_view" - sql "create view test_use_plan_cache9_view as select * from test_use_plan_cache9" + sql "delete from test_use_plan_cache7 where id = 1" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + // delete can not use cache + sql "select * from test_use_plan_cache7" + assertNoCache "select * from test_use_plan_cache7" + }), + extraThread("testDropTable", { + createTestTable "test_use_plan_cache8" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + // after partition changed 10s, the sql cache can be used + sleep(10000) - assertNoCache "select * from test_use_plan_cache9_view" - sql "select * from test_use_plan_cache9_view" - assertHasCache "select * from test_use_plan_cache9_view" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - // alter view should not use cache - sql "alter view test_use_plan_cache9_view as select id from test_use_plan_cache9" - assertNoCache "select * from test_use_plan_cache9_view" - }), - extraThread("testDropView", { - createTestTable "test_use_plan_cache10" + assertNoCache "select * from test_use_plan_cache8" + sql "select * from test_use_plan_cache8" + assertHasCache "select * from test_use_plan_cache8" - sql "drop view if exists test_use_plan_cache10_view" - sql "create view test_use_plan_cache10_view as select * from test_use_plan_cache10" + sql "drop table test_use_plan_cache8" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + // should visible the table has bean deleted + test { + sql "select * from test_use_plan_cache8" + exception "does not exist in database" + } + }), + extraThread("testCreateAndAlterView", { + createTestTable "test_use_plan_cache9" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + sql "drop view if exists test_use_plan_cache9_view" + sql "create view test_use_plan_cache9_view as select * from test_use_plan_cache9" - assertNoCache "select * from test_use_plan_cache10_view" - sql "select * from test_use_plan_cache10_view" - assertHasCache "select * from test_use_plan_cache10_view" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "drop view test_use_plan_cache10_view" - // should visible the view has bean deleted - test { - sql "select * from test_use_plan_cache10_view" - exception "does not exist in database" - } - }), - extraThread("testBaseTableChanged", { - createTestTable "test_use_plan_cache11" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - sql "drop view if exists test_use_plan_cache11_view" - sql "create view test_use_plan_cache11_view as select * from test_use_plan_cache11" + assertNoCache "select * from test_use_plan_cache9_view" + sql "select * from test_use_plan_cache9_view" + assertHasCache "select * from test_use_plan_cache9_view" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache11_view" - sql "select * from test_use_plan_cache11_view" - assertHasCache "select * from test_use_plan_cache11_view" - - sql "insert into test_use_plan_cache11 values(1, 3)" - - // base table already changed, can not use cache - assertNoCache "select * from test_use_plan_cache11_view" - }), - extraThread("testNotShareCacheBetweenUsers", { - sql "drop user if exists test_cache_user1" - sql "create user test_cache_user1 identified by 'DORIS@2024'" - def dbName = context.config.getDbNameByFile(context.file) - sql """GRANT SELECT_PRIV ON *.* TO test_cache_user1""" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user1""" - } - - createTestTable "test_use_plan_cache12" - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + // alter view should not use cache + sql "alter view test_use_plan_cache9_view as select id from test_use_plan_cache9" + assertNoCache "select * from test_use_plan_cache9_view" + }), + extraThread("testDropView", { + createTestTable "test_use_plan_cache10" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + sql "drop view if exists test_use_plan_cache10_view" + sql "create view test_use_plan_cache10_view as select * from test_use_plan_cache10" - assertNoCache "select * from test_use_plan_cache12" - sql "select * from test_use_plan_cache12" - assertHasCache "select * from test_use_plan_cache12" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "sync" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - extraThread("test_cache_user1_thread", { - connect("test_cache_user1", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + assertNoCache "select * from test_use_plan_cache10_view" + sql "select * from test_use_plan_cache10_view" + assertHasCache "select * from test_use_plan_cache10_view" - assertNoCache "select * from test_use_plan_cache12" + sql "drop view test_use_plan_cache10_view" + // should visible the view has bean deleted + test { + sql "select * from test_use_plan_cache10_view" + exception "does not exist in database" } - }).get() - }), - extraThread("testAddRowPolicy", { - def dbName = context.config.getDbNameByFile(context.file) - try_sql """ - DROP ROW POLICY if exists test_cache_row_policy_2 - ON ${dbName}.test_use_plan_cache13 - FOR test_cache_user2""" - - sql "drop user if exists test_cache_user2" - sql "create user test_cache_user2 identified by 'DORIS@2024'" - sql """GRANT SELECT_PRIV ON *.* TO test_cache_user2""" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user2""" - } - - createTestTable "test_use_plan_cache13" - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + }), + extraThread("testBaseTableChanged", { + createTestTable "test_use_plan_cache11" - sql "sync" + sql "drop view if exists test_use_plan_cache11_view" + sql "create view test_use_plan_cache11_view as select * from test_use_plan_cache11" - extraThread("test_cache_user2_thread", { - connect("test_cache_user2", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + // after partition changed 10s, the sql cache can be used + sleep(10000) - assertNoCache "select * from test_use_plan_cache13" - sql "select * from test_use_plan_cache13" - assertHasCache "select * from test_use_plan_cache13" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache11_view" + sql "select * from test_use_plan_cache11_view" + assertHasCache "select * from test_use_plan_cache11_view" + + sql "insert into test_use_plan_cache11 values(1, 3)" + + // base table already changed, can not use cache + assertNoCache "select * from test_use_plan_cache11_view" + }), + extraThread("testNotShareCacheBetweenUsers", { + sql "drop user if exists test_cache_user1" + sql "create user test_cache_user1 identified by 'DORIS@2024'" + def dbName = context.config.getDbNameByFile(context.file) + sql """GRANT SELECT_PRIV ON *.* TO test_cache_user1""" + sql "GRANT ADMIN_PRIV ON *.*.* TO test_cache_user1" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user1""" } - }).get() - sql """ - CREATE ROW POLICY test_cache_row_policy_2 - ON ${dbName}.test_use_plan_cache13 - AS RESTRICTIVE TO test_cache_user2 - USING (id = 4)""" + createTestTable "test_use_plan_cache12" - sql "sync" + // after partition changed 10s, the sql cache can be used + sleep(10000) - // after row policy changed, the cache is invalidate - extraThread("test_cache_user2_thread2", { - connect("test_cache_user2", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache13" - } - }).get() - }), - extraThread("testDropRowPolicy", { - def dbName = context.config.getDbNameByFile(context.file) - try_sql """ - DROP ROW POLICY if exists test_cache_row_policy_3 - ON ${dbName}.test_use_plan_cache14 - FOR test_cache_user3""" - - sql "drop user if exists test_cache_user3" - sql "create user test_cache_user3 identified by 'DORIS@2024'" - sql """GRANT SELECT_PRIV ON *.* TO test_cache_user3""" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user3""" - } - - createTestTable "test_use_plan_cache14" - - sql """ - CREATE ROW POLICY test_cache_row_policy_3 - ON ${dbName}.test_use_plan_cache14 - AS RESTRICTIVE TO test_cache_user3 - USING (id = 4)""" - - sql "sync" - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + assertNoCache "select * from test_use_plan_cache12" + sql "select * from test_use_plan_cache12" + assertHasCache "select * from test_use_plan_cache12" - extraThread("test_cache_user3_thread", { - connect("test_cache_user3", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + sql "sync" + + extraThread("test_cache_user1_thread", { + connect("test_cache_user1", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache14" - sql "select * from test_use_plan_cache14" - assertHasCache "select * from test_use_plan_cache14" + assertNoCache "select * from test_use_plan_cache12" + } + }).get() + }), + extraThread("testAddRowPolicy", { + def dbName = context.config.getDbNameByFile(context.file) + try_sql """ + DROP ROW POLICY if exists test_cache_row_policy_2 + ON ${dbName}.test_use_plan_cache13 + FOR test_cache_user2""" + + sql "drop user if exists test_cache_user2" + sql "create user test_cache_user2 identified by 'DORIS@2024'" + sql """GRANT SELECT_PRIV ON *.* TO test_cache_user2""" + sql "GRANT ADMIN_PRIV ON *.*.* TO test_cache_user2" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user2""" } - }).get() - try_sql """ - DROP ROW POLICY if exists test_cache_row_policy_3 - ON ${dbName}.test_use_plan_cache14 - FOR test_cache_user3""" + createTestTable "test_use_plan_cache13" - sql "sync" + // after partition changed 10s, the sql cache can be used + sleep(10000) - // after row policy changed, the cache is invalidate - extraThread("test_cache_user3_thread2", { - connect("test_cache_user3", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + sql "sync" - assertNoCache "select * from test_use_plan_cache14" - } - }).get() - }), - extraThread("testRemovePrivilege", { - def dbName = context.config.getDbNameByFile(context.file) + extraThread("test_cache_user2_thread", { + connect("test_cache_user2", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - createTestTable "test_use_plan_cache15" + assertNoCache "select * from test_use_plan_cache13" + sql "select * from test_use_plan_cache13" + assertHasCache "select * from test_use_plan_cache13" + } + }).get() - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - - sql "drop user if exists test_cache_user4" - sql "create user test_cache_user4 identified by 'DORIS@2024'" - sql "GRANT SELECT_PRIV ON regression_test.* TO test_cache_user4" - sql "GRANT SELECT_PRIV ON ${dbName}.test_use_plan_cache15 TO test_cache_user4" - //cloud-mode - if (isCloudMode()) { - def clusters = sql " SHOW CLUSTERS; " - assertTrue(!clusters.isEmpty()) - def validCluster = clusters[0][0] - sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user4""" - } - - sql "sync" - - extraThread("test_cache_user4_thread", { - connect("test_cache_user4", "DORIS@2024") { - sql "use ${dbName}" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + sql """ + CREATE ROW POLICY test_cache_row_policy_2 + ON ${dbName}.test_use_plan_cache13 + AS RESTRICTIVE TO test_cache_user2 + USING (id = 4)""" + + sql "sync" + + // after row policy changed, the cache is invalidate + extraThread("test_cache_user2_thread2", { + connect("test_cache_user2", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache15" - sql "select * from test_use_plan_cache15" - assertHasCache "select * from test_use_plan_cache15" + assertNoCache "select * from test_use_plan_cache13" + } + }).get() + }), + extraThread("testDropRowPolicy", { + def dbName = context.config.getDbNameByFile(context.file) + try_sql """ + DROP ROW POLICY if exists test_cache_row_policy_3 + ON ${dbName}.test_use_plan_cache14 + FOR test_cache_user3""" + + sql "drop user if exists test_cache_user3" + sql "create user test_cache_user3 identified by 'DORIS@2024'" + sql """GRANT SELECT_PRIV ON *.* TO test_cache_user3""" + sql "GRANT ADMIN_PRIV ON *.*.* TO test_cache_user3" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user3""" } - }).get() - sql "REVOKE SELECT_PRIV ON ${dbName}.test_use_plan_cache15 FROM test_cache_user4" + createTestTable "test_use_plan_cache14" - sql "sync" + sql """ + CREATE ROW POLICY test_cache_row_policy_3 + ON ${dbName}.test_use_plan_cache14 + AS RESTRICTIVE TO test_cache_user3 + USING (id = 4)""" - // after privileges changed, the cache is invalidate - extraThread("test_cache_user4_thread2", { - connect("test_cache_user4", "DORIS@2024") { - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + sql "sync" - test { - sql ("select * from ${dbName}.test_use_plan_cache15") - exception "Permission denied" + // after partition changed 10s, the sql cache can be used + sleep(10000) + + extraThread("test_cache_user3_thread", { + connect("test_cache_user3", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache14" + sql "select * from test_use_plan_cache14" + assertHasCache "select * from test_use_plan_cache14" } + }).get() + + try_sql """ + DROP ROW POLICY if exists test_cache_row_policy_3 + ON ${dbName}.test_use_plan_cache14 + FOR test_cache_user3""" + + sql "sync" + + // after row policy changed, the cache is invalidate + extraThread("test_cache_user3_thread2", { + connect("test_cache_user3", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache14" + } + }).get() + }), + extraThread("testRemovePrivilege", { + def dbName = context.config.getDbNameByFile(context.file) + + createTestTable "test_use_plan_cache15" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "drop user if exists test_cache_user4" + sql "create user test_cache_user4 identified by 'DORIS@2024'" + sql "GRANT SELECT_PRIV ON regression_test.* TO test_cache_user4" + sql "GRANT SELECT_PRIV ON ${dbName}.test_use_plan_cache15 TO test_cache_user4" + //cloud-mode + if (isCloudMode()) { + def clusters = sql " SHOW CLUSTERS; " + assertTrue(!clusters.isEmpty()) + def validCluster = clusters[0][0] + sql """GRANT USAGE_PRIV ON CLUSTER ${validCluster} TO test_cache_user4""" } - }).get() - }), - extraThread("testNondeterministic", { - createTestTable "test_use_plan_cache16" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + sql "sync" - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + extraThread("test_cache_user4_thread", { + connect("test_cache_user4", "DORIS@2024") { + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache15" + sql "select * from test_use_plan_cache15" + assertHasCache "select * from test_use_plan_cache15" + } + }).get() - assertNoCache "select random() from test_use_plan_cache16" - // create sql cache - sql "select random() from test_use_plan_cache16" - // can not use sql cache - assertNoCache "select random() from test_use_plan_cache16" + sql "REVOKE SELECT_PRIV ON ${dbName}.test_use_plan_cache15 FROM test_cache_user4" + sql "sync" + + // after privileges changed, the cache is invalidate + extraThread("test_cache_user4_thread2", { + connect("test_cache_user4", "DORIS@2024") { + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + test { + sql ("select * from ${dbName}.test_use_plan_cache15") + exception "Permission denied" + } + } + }).get() + }), + extraThread("testNondeterministic", { + createTestTable "test_use_plan_cache16" - assertNoCache "select year(now()) from test_use_plan_cache16" - sql "select year(now()) from test_use_plan_cache16" - assertHasCache "select year(now()) from test_use_plan_cache16" + // after partition changed 10s, the sql cache can be used + sleep(10000) + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - assertNoCache "select second(now()) from test_use_plan_cache16" - sql "select second(now()) from test_use_plan_cache16" - sleep(1000) - assertNoCache "select second(now()) from test_use_plan_cache16" - }), - extraThread("testUserVariable", { - // make sure if the table has been dropped, the cache should invalidate, - // so we should retry twice to check - for (def i in 0..2) { + assertNoCache "select random() from test_use_plan_cache16" + // create sql cache + sql "select random() from test_use_plan_cache16" + // can not use sql cache + assertNoCache "select random() from test_use_plan_cache16" + + assertNoCache "select year(now()) from test_use_plan_cache16" + sql "select year(now()) from test_use_plan_cache16" + assertHasCache "select year(now()) from test_use_plan_cache16" + + assertNoCache "select second(now()) from test_use_plan_cache16" + sql "select second(now()) from test_use_plan_cache16" + sleep(1000) + assertNoCache "select second(now()) from test_use_plan_cache16" + }), + extraThread("testUserVariable", { createTestTable "test_use_plan_cache17" // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + sleep(10000) sql "set enable_nereids_planner=true" sql "set enable_fallback_to_original_planner=false" @@ -579,7 +559,6 @@ suite("parse_sql_from_sql_cache") { def result1 = sql "select @custom_variable from test_use_plan_cache17 where id = 1 and value = 1" assertTrue(result1.size() == 1 && result1[0][0].toString().toInteger() == 10) - sql "set @custom_variable2=1" assertNoCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" def res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" @@ -598,344 +577,334 @@ suite("parse_sql_from_sql_cache") { assertHasCache "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" res = sql "select * from test_use_plan_cache17 where id = @custom_variable2 and value = 1" assertTrue(res[0][0] == 1) - } - }), - extraThread("test_udf", { - def jarPath = """${context.config.suitePath}/javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar""" - scp_udf_file_to_all_be(jarPath) - try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);") - try_sql("DROP TABLE IF EXISTS test_javaudf_string") - - sql """ DROP TABLE IF EXISTS test_javaudf_string """ - sql """ - CREATE TABLE IF NOT EXISTS test_javaudf_string ( - `user_id` INT NOT NULL COMMENT "用户id", - `char_col` CHAR NOT NULL COMMENT "", - `varchar_col` VARCHAR(10) NOT NULL COMMENT "", - `string_col` STRING NOT NULL COMMENT "" - ) - DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); - """ - - StringBuilder values = new StringBuilder() - int i = 1 - for (; i < 9; i ++) { - values.append(" (${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'),\n") - } - values.append("(${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg')") - - sql "INSERT INTO test_javaudf_string VALUES ${values}" - sql "sync" - - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - - File path = new File(jarPath) - if (!path.exists()) { - throw new IllegalStateException("""${jarPath} doesn't exist! """) - } - - sql """ CREATE FUNCTION java_udf_string_test(string, int, int) RETURNS string PROPERTIES ( - "file"="file://${jarPath}", - "symbol"="org.apache.doris.udf.StringTest", - "type"="JAVA_UDF" - ); """ - - assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" - sql "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" - assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" - }), - extraThread("testMultiFrontends", { - def aliveFrontends = sql_return_maparray("show frontends") - .stream() - .filter { it["Alive"].toString().equalsIgnoreCase("true") } - .collect(Collectors.toList()) - - if (aliveFrontends.size() <= 1) { - return - } - - def fe1 = aliveFrontends[0]["Host"] + ":" + aliveFrontends[0]["QueryPort"] - def fe2 = fe1 - if (aliveFrontends.size() > 1) { - fe2 = aliveFrontends[1]["Host"] + ":" + aliveFrontends[1]["QueryPort"] - } - - log.info("fe1: ${fe1}") - log.info("fe2: ${fe2}") - - def dbName = context.config.getDbNameByFile(context.file) - - log.info("connect to fe: ${fe1}") - connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe1}") { - sql "use ${dbName}" - - createTestTable "test_use_plan_cache18" + }), + extraThread("test_udf", { + def jarPath = """${context.config.suitePath}/javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar""" + scp_udf_file_to_all_be(jarPath) + try_sql("DROP FUNCTION IF EXISTS java_udf_string_test(string, int, int);") + try_sql("DROP TABLE IF EXISTS test_javaudf_string") + + sql """ DROP TABLE IF EXISTS test_javaudf_string """ + sql """ + CREATE TABLE IF NOT EXISTS test_javaudf_string ( + `user_id` INT NOT NULL COMMENT "用户id", + `char_col` CHAR NOT NULL COMMENT "", + `varchar_col` VARCHAR(10) NOT NULL COMMENT "", + `string_col` STRING NOT NULL COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + + StringBuilder values = new StringBuilder() + int i = 1 + for (; i < 9; i ++) { + values.append(" (${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'),\n") + } + values.append("(${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg')") + sql "INSERT INTO test_javaudf_string VALUES ${values}" sql "sync" + sleep(10000) + + File path = new File(jarPath) + if (!path.exists()) { + throw new IllegalStateException("""${jarPath} doesn't exist! """) + } + + sql """ CREATE FUNCTION java_udf_string_test(string, int, int) RETURNS string PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StringTest", + "type"="JAVA_UDF" + ); """ + + assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" + sql "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" + assertNoCache "SELECT java_udf_string_test(varchar_col, 2, 3) result FROM test_javaudf_string ORDER BY result;" + }), + extraThread("testMultiFrontends", { + def aliveFrontends = sql_return_maparray("show frontends") + .stream() + .filter { it["Alive"].toString().equalsIgnoreCase("true") } + .collect(Collectors.toList()) + + if (aliveFrontends.size() <= 1) { + return + } + + def fe1 = aliveFrontends[0]["Host"] + ":" + aliveFrontends[0]["QueryPort"] + def fe2 = fe1 + if (aliveFrontends.size() > 1) { + fe2 = aliveFrontends[1]["Host"] + ":" + aliveFrontends[1]["QueryPort"] + } + + log.info("fe1: ${fe1}") + log.info("fe2: ${fe2}") + + def dbName = context.config.getDbNameByFile(context.file) + + log.info("connect to fe: ${fe1}") + connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe1}") { + sql "use ${dbName}" + + createTestTable "test_use_plan_cache18" + + sql "sync" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache18" + sql "select * from test_use_plan_cache18" + assertHasCache "select * from test_use_plan_cache18" + } + + log.info("connect to fe: ${fe2}") + connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe2}") { + + sql "use ${dbName}" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + assertNoCache "select * from test_use_plan_cache18" + sql "select * from test_use_plan_cache18" + assertHasCache "select * from test_use_plan_cache18" + } + }), + extraThread("test_dry_run_query", { + createTestTable "test_use_plan_cache19" + // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + sleep(10000) sql "set enable_nereids_planner=true" sql "set enable_fallback_to_original_planner=false" sql "set enable_sql_cache=true" - assertNoCache "select * from test_use_plan_cache18" - sql "select * from test_use_plan_cache18" - assertHasCache "select * from test_use_plan_cache18" - } + sql "set dry_run_query=true" + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + def result1 = sql "select * from test_use_plan_cache19 order by 1, 2" + assertTrue(result1.size() == 1) + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + + sql "set dry_run_query=false" + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + def result2 = sql "select * from test_use_plan_cache19 order by 1, 2" + assertTrue(result2.size() > 1) + assertHasCache "select * from test_use_plan_cache19 order by 1, 2" + + sql "set dry_run_query=true" + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + def result3 = sql "select * from test_use_plan_cache19 order by 1, 2" + assertTrue(result3.size() == 1) + assertNoCache "select * from test_use_plan_cache19 order by 1, 2" + }), + extraThread("test_sql_cache_in_fe", { + createTestTable "test_use_plan_cache20" + + sql "alter table test_use_plan_cache20 add partition p6 values[('999'), ('1000'))" - log.info("connect to fe: ${fe2}") - connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe2}") { - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "use ${dbName}" sql "set enable_nereids_planner=true" sql "set enable_fallback_to_original_planner=false" sql "set enable_sql_cache=true" - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - assertNoCache "select * from test_use_plan_cache18" - sql "select * from test_use_plan_cache18" - assertHasCache "select * from test_use_plan_cache18" - } - }), - extraThread("test_dry_run_query", { - createTestTable "test_use_plan_cache19" + int randomInt = (int) (Math.random() * 2000000000) + + assertNoCache "select * from (select $randomInt as id)a" + def result1 = sql "select * from (select $randomInt as id)a" + assertTrue(result1.size() == 1) + + assertHasCache "select * from (select $randomInt as id)a" + def result2 = sql "select * from (select $randomInt as id)a" + assertTrue(result2.size() == 1) + + sql "select * from test_use_plan_cache20 limit 0" + assertHasCache "select * from test_use_plan_cache20 limit 0" + def result4 = sql "select * from test_use_plan_cache20 limit 0" + assertTrue(result4.isEmpty()) + + assertNoCache "select * from test_use_plan_cache20 where id=999" + def result5 = sql "select * from test_use_plan_cache20 where id=999" + assertTrue(result5.isEmpty()) + assertHasCache "select * from test_use_plan_cache20 where id=999" + def result6 = sql "select * from test_use_plan_cache20 where id=999" + assertTrue(result6.isEmpty()) + }), + extraThread("test_truncate_partition", { + sql "drop table if exists test_use_plan_cache21" + sql """create table test_use_plan_cache21 ( + id int, + dt int + ) + partition by range(dt) + ( + partition dt1 values [('1'), ('2')), + partition dt2 values [('2'), ('3')) + ) + distributed by hash(id) + properties('replication_num'='1')""" + + sql "insert into test_use_plan_cache21 values('2', '2')" + sleep(100) + sql "insert into test_use_plan_cache21 values('1', '1')" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - sql "set dry_run_query=true" - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - def result1 = sql "select * from test_use_plan_cache19 order by 1, 2" - assertTrue(result1.size() == 1) - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - - sql "set dry_run_query=false" - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - def result2 = sql "select * from test_use_plan_cache19 order by 1, 2" - assertTrue(result2.size() > 1) - assertHasCache "select * from test_use_plan_cache19 order by 1, 2" - - sql "set dry_run_query=true" - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - def result3 = sql "select * from test_use_plan_cache19 order by 1, 2" - assertTrue(result3.size() == 1) - assertNoCache "select * from test_use_plan_cache19 order by 1, 2" - }), - extraThread("test_sql_cache_in_fe", { - createTestTable "test_use_plan_cache20" - - sql "alter table test_use_plan_cache20 add partition p6 values[('999'), ('1000'))" - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - int randomInt = Math.random() * 2000000000 - - assertNoCache "select * from (select $randomInt as id)a" - def result1 = sql "select * from (select $randomInt as id)a" - assertTrue(result1.size() == 1) - - assertHasCache "select * from (select $randomInt as id)a" - def result2 = sql "select * from (select $randomInt as id)a" - assertTrue(result2.size() == 1) - - sql "select * from test_use_plan_cache20 limit 0" - assertHasCache "select * from test_use_plan_cache20 limit 0" - def result4 = sql "select * from test_use_plan_cache20 limit 0" - assertTrue(result4.isEmpty()) - - assertNoCache "select * from test_use_plan_cache20 where id=999" - def result5 = sql "select * from test_use_plan_cache20 where id=999" - assertTrue(result5.isEmpty()) - assertHasCache "select * from test_use_plan_cache20 where id=999" - def result6 = sql "select * from test_use_plan_cache20 where id=999" - assertTrue(result6.isEmpty()) - }), - extraThread("test_truncate_partition", { - sql "drop table if exists test_use_plan_cache21" - sql """create table test_use_plan_cache21 ( - id int, - dt int - ) - partition by range(dt) - ( - partition dt1 values [('1'), ('2')), - partition dt2 values [('2'), ('3')) - ) - distributed by hash(id) - properties('replication_num'='1')""" - - sql "insert into test_use_plan_cache21 values('2', '2')" - sleep(100) - sql "insert into test_use_plan_cache21 values('1', '1')" - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" - - assertNoCache "select * from test_use_plan_cache21" - def result1 = sql "select * from test_use_plan_cache21" - assertTrue(result1.size() == 2) - assertHasCache "select * from test_use_plan_cache21" - - sql "truncate table test_use_plan_cache21 partition dt2" - assertNoCache "select * from test_use_plan_cache21" - def result2 = sql "select * from test_use_plan_cache21" - assertTrue(result2.size() == 1) - }), - extraThread("remove_comment", { - createTestTable "test_use_plan_cache22" - - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - assertNoCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" - sql "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment1*/ * from test_use_plan_cache22 order by 1, 2" + assertNoCache "select * from test_use_plan_cache21" + def result1 = sql "select * from test_use_plan_cache21" + assertTrue(result1.size() == 2) + assertHasCache "select * from test_use_plan_cache21" - assertHasCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" - }), - extraThread("is_cache_profile", { - createTestTable "test_use_plan_cache23" + sql "truncate table test_use_plan_cache21 partition dt2" + assertNoCache "select * from test_use_plan_cache21" + def result2 = sql "select * from test_use_plan_cache21" + assertTrue(result2.size() == 1) + }), + extraThread("remove_comment", { + createTestTable "test_use_plan_cache22" - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) + // after partition changed 10s, the sql cache can be used + sleep(10000) - sql "set enable_nereids_planner=true" - sql "set enable_fallback_to_original_planner=false" - sql "set enable_sql_cache=true" + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" - int randomInt = Math.random() * 2000000000 - sql "select ${randomInt} from test_use_plan_cache23" - profile("sql_cache_23_${randomInt}") { - run { - sql "/* sql_cache_23_${randomInt} */ select ${randomInt} from test_use_plan_cache23" - } + assertNoCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" + sql "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment1*/ * from test_use_plan_cache22 order by 1, 2" - check { profileString, exception -> - log.info(profileString) - assertTrue(profileString.contains("Is Cached: Yes")) - } - } + assertHasCache "select /*+SET_VAR(disable_nereids_rules='')*/ /*comment2*/ * from test_use_plan_cache22 order by 1, 2" + }), + extraThread("is_cache_profile", { + createTestTable "test_use_plan_cache23" + + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_nereids_planner=true" + sql "set enable_fallback_to_original_planner=false" + sql "set enable_sql_cache=true" + + int randomInt = Math.random() * 2000000000 + sql "select ${randomInt} from test_use_plan_cache23" + profile("sql_cache_23_${randomInt}") { + run { + sql "/* sql_cache_23_${randomInt} */ select ${randomInt} from test_use_plan_cache23" + } - randomInt = Math.random() * 2000000000 - sql "select * from (select $randomInt as id)a" - profile("sql_cache_23_${randomInt}_2") { - run { - sql "/* sql_cache_23_${randomInt}_2 */ select * from (select $randomInt as id)a" + check { profileString, exception -> + log.info(profileString) + assertTrue(profileString.contains("Is Cached: Yes")) + } } - check { profileString, exception -> - log.info(profileString) - assertTrue(profileString.contains("Is Cached: Yes")) + randomInt = Math.random() * 2000000000 + sql "select * from (select $randomInt as id)a" + profile("sql_cache_23_${randomInt}_2") { + run { + sql "/* sql_cache_23_${randomInt}_2 */ select * from (select $randomInt as id)a" + } + + check { profileString, exception -> + log.info(profileString) + assertTrue(profileString.contains("Is Cached: Yes")) + } } - } - }), - extraThread("sql_cache_with_date_format", { - sql "set enable_sql_cache=true" - for (def i in 0..3) { - def result = sql "select FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss')" - assertNotEquals("yyyy-MM-dd HH:mm:ss", result[0][0]) - } - }), - extraThread("test_same_sql_with_different_db", { - def dbName1 = "test_db1" - def dbName2 = "test_db2" - def tableName = "test_cache_table" - - sql "CREATE DATABASE IF NOT EXISTS ${dbName1}" - sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" - sql """ - CREATE TABLE IF NOT EXISTS ${dbName1}.${tableName} ( - `k1` date NOT NULL COMMENT "", - `k2` int(11) NOT NULL COMMENT "" - ) ENGINE=OLAP - DUPLICATE KEY(`k1`, `k2`) - COMMENT "OLAP" - PARTITION BY RANGE(`k1`) - (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) - DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 - PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "in_memory" = "false", - "storage_format" = "V2" - ) - """ - sql "CREATE DATABASE IF NOT EXISTS ${dbName2}" - sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" - sql """ - CREATE TABLE IF NOT EXISTS ${dbName2}.${tableName} ( - `k1` date NOT NULL COMMENT "", - `k2` int(11) NOT NULL COMMENT "" - ) ENGINE=OLAP - DUPLICATE KEY(`k1`, `k2`) - COMMENT "OLAP" - PARTITION BY RANGE(`k1`) - (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) - DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 - PROPERTIES ( - "replication_allocation" = "tag.location.default: 1", - "in_memory" = "false", - "storage_format" = "V2" - ) - """ - - sql """ - INSERT INTO ${dbName1}.${tableName} VALUES - ("2024-11-29",0), - ("2024-11-30",0) - """ - // after partition changed 10s, the sql cache can be used - sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')" - sleep(12000) - sql """ - INSERT INTO ${dbName2}.${tableName} VALUES - ("2024-11-29",0) - """ - // after partition changed 10s, the sql cache can be used - sleep(12000) - - sql "set enable_sql_cache=true" - sql "use ${dbName1}" - List> result1 = sql """ - SELECT COUNT(*) FROM ${tableName} - """ - assertEquals(result1[0][0],2) - - sql "use ${dbName2}" - List> result2 = sql """ - SELECT COUNT(*) FROM ${tableName} - """ - assertEquals(result2[0][0],1) - - sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" - sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" - sql "DROP DATABASE IF EXISTS ${dbName1}" - sql "DROP DATABASE IF EXISTS ${dbName2}" - }) - ).get() + }), + extraThread("sql_cache_with_date_format", { + sql "set enable_sql_cache=true" + for (def i in 0..3) { + def result = sql "select FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd HH:mm:ss')" + assertNotEquals("yyyy-MM-dd HH:mm:ss", result[0][0]) + } + }), + extraThread("test_same_sql_with_different_db", { + def dbName1 = "test_db1" + def dbName2 = "test_db2" + def tableName = "test_cache_table" + + sql "CREATE DATABASE IF NOT EXISTS ${dbName1}" + sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${dbName1}.${tableName} ( + `k1` date NOT NULL COMMENT "", + `k2` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT "OLAP" + PARTITION BY RANGE(`k1`) + (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + sql "CREATE DATABASE IF NOT EXISTS ${dbName2}" + sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${dbName2}.${tableName} ( + `k1` date NOT NULL COMMENT "", + `k2` int(11) NOT NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`k1`, `k2`) + COMMENT "OLAP" + PARTITION BY RANGE(`k1`) + (PARTITION p202411 VALUES [('2024-11-01'), ('2024-12-01'))) + DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2" + ) + """ + + sql """ + INSERT INTO ${dbName1}.${tableName} VALUES + ("2024-11-29",0), + ("2024-11-30",0) + """ + // after partition changed 10s, the sql cache can be used + sleep(10000) + sql """ + INSERT INTO ${dbName2}.${tableName} VALUES + ("2024-11-29",0) + """ + // after partition changed 10s, the sql cache can be used + sleep(10000) + + sql "set enable_sql_cache=true" + sql "use ${dbName1}" + List> result1 = sql """ + SELECT COUNT(*) FROM ${tableName} + """ + assertEquals(result1[0][0],2) + + sql "use ${dbName2}" + List> result2 = sql """ + SELECT COUNT(*) FROM ${tableName} + """ + assertEquals(result2[0][0],1) + + sql "DROP TABLE IF EXISTS ${dbName1}.${tableName}" + sql "DROP TABLE IF EXISTS ${dbName2}.${tableName}" + sql "DROP DATABASE IF EXISTS ${dbName1}" + sql "DROP DATABASE IF EXISTS ${dbName2}" + }) + ).get() + } }