Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed Jan 6, 2025
1 parent a539e22 commit ee8f916
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.google.common.collect.Maps
import com.google.common.util.concurrent.Futures
import com.google.common.util.concurrent.ListenableFuture
import com.google.common.util.concurrent.MoreExecutors
import com.google.common.util.concurrent.Uninterruptibles
import com.google.gson.Gson
import groovy.json.JsonOutput
import groovy.json.JsonSlurper
Expand Down Expand Up @@ -701,6 +702,23 @@ class Suite implements GroovyInterceptable {
return sql
}

<T> T retry(int executeTimes = 3, int intervalMillis = 1000, Closure<T> closure) {
Throwable throwable = null
for (int i = 1; i <= executeTimes; ++i) {
try {
return closure()
} catch (Throwable t) {
logger.warn("Retry failed: $t", t)
throwable = t
Uninterruptibles.sleepUninterruptibly(intervalMillis, TimeUnit.MILLISECONDS)
}
}
if (throwable != null) {
throw throwable
}
return null
}

void explain(Closure actionSupplier) {
if (context.useArrowFlightSql()) {
runAction(new ExplainAction(context, "ARROW_FLIGHT_SQL"), actionSupplier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,28 @@ import java.util.stream.Collectors

suite("parse_sql_from_sql_cache") {
def assertHasCache = { String sqlStr ->
if (isCloudMode()) {
return
}
explain {
sql ("physical plan ${sqlStr}")
contains("PhysicalSqlCache")
retry(120, 1000) {
explain {
sql ("physical plan ${sqlStr}")
contains("PhysicalSqlCache")
}
}
}

def assertNoCache = { String sqlStr ->
if (isCloudMode()) {
return
}
explain {
sql ("physical plan ${sqlStr}")
notContains("PhysicalSqlCache")
}
}

sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"

combineFutures(
extraThread("testUsePlanCache", {
createTestTable "test_use_plan_cache"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -63,7 +58,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache2"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -90,7 +86,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache3"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -117,7 +114,8 @@ suite("parse_sql_from_sql_cache") {
sql "sync"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -135,7 +133,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache5"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -160,7 +159,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable("test_use_plan_cache6", true)

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -180,7 +180,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache7"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -200,7 +201,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache8"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -225,7 +227,8 @@ suite("parse_sql_from_sql_cache") {
sql "create view test_use_plan_cache9_view as select * from test_use_plan_cache9"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -246,7 +249,8 @@ suite("parse_sql_from_sql_cache") {
sql "create view test_use_plan_cache10_view as select * from test_use_plan_cache10"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -270,7 +274,8 @@ suite("parse_sql_from_sql_cache") {
sql "create view test_use_plan_cache11_view as select * from test_use_plan_cache11"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand Down Expand Up @@ -301,7 +306,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache12"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand Down Expand Up @@ -346,7 +352,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache13"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "sync"

Expand Down Expand Up @@ -412,7 +419,8 @@ suite("parse_sql_from_sql_cache") {
sql "sync"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

extraThread("test_cache_user3_thread", {
connect("test_cache_user3", "DORIS@2024") {
Expand Down Expand Up @@ -452,7 +460,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache15"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "drop user if exists test_cache_user4"
sql "create user test_cache_user4 identified by 'DORIS@2024'"
Expand Down Expand Up @@ -503,7 +512,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache16"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand Down Expand Up @@ -533,7 +543,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache17"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand Down Expand Up @@ -607,6 +618,9 @@ suite("parse_sql_from_sql_cache") {
sql "INSERT INTO test_javaudf_string VALUES ${values}"
sql "sync"

sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

File path = new File(jarPath)
if (!path.exists()) {
throw new IllegalStateException("""${jarPath} doesn't exist! """)
Expand Down Expand Up @@ -645,7 +659,7 @@ suite("parse_sql_from_sql_cache") {

log.info("connect to fe: ${fe1}")
connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe1}") {
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"

sql "use ${dbName}"

Expand All @@ -654,7 +668,8 @@ suite("parse_sql_from_sql_cache") {
sql "sync"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -667,7 +682,7 @@ suite("parse_sql_from_sql_cache") {

log.info("connect to fe: ${fe2}")
connect( context.config.jdbcUser, context.config.jdbcPassword, "jdbc:mysql://${fe2}") {
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"

sql "use ${dbName}"
sql "set enable_nereids_planner=true"
Expand All @@ -683,7 +698,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache19"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand Down Expand Up @@ -713,7 +729,8 @@ suite("parse_sql_from_sql_cache") {
sql "alter table test_use_plan_cache20 add partition p6 values[('999'), ('1000'))"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand Down Expand Up @@ -760,7 +777,8 @@ suite("parse_sql_from_sql_cache") {
sql "insert into test_use_plan_cache21 values('1', '1')"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -780,7 +798,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache22"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand All @@ -795,7 +814,8 @@ suite("parse_sql_from_sql_cache") {
createTestTable "test_use_plan_cache23"

// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
Expand Down Expand Up @@ -882,13 +902,14 @@ suite("parse_sql_from_sql_cache") {
("2024-11-30",0)
"""
// after partition changed 10s, the sql cache can be used
sleep(10000)
sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"
sleep(12000)
sql """
INSERT INTO ${dbName2}.${tableName} VALUES
("2024-11-29",0)
"""
// after partition changed 10s, the sql cache can be used
sleep(10000)
sleep(12000)

sql "set enable_sql_cache=true"
sql "use ${dbName1}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,25 @@ suite("colocate_union_numbers") {
"""

def extractFragment = { String sqlStr, String containsString, Closure<Integer> checkExchangeNum ->
explain {
sql sqlStr
check { result ->
log.info("Explain result:\n${result}")
retry(120, 1000) {
explain {
sql sqlStr
check { result ->
log.info("Explain result:\n${result}")

assertTrue(result.contains(containsString))
assertTrue(result.contains(containsString))

def fragmentContainsJoin = result.split("PLAN FRAGMENT")
.toList()
.stream()
.filter { it.contains(containsString) }
.findFirst()
.get()
def fragmentContainsJoin = result.split("PLAN FRAGMENT")
.toList()
.stream()
.filter { it.contains(containsString) }
.findFirst()
.get()

log.info("Fragment:\n${fragmentContainsJoin}")
log.info("Fragment:\n${fragmentContainsJoin}")

checkExchangeNum(fragmentContainsJoin.count("VEXCHANGE"))
checkExchangeNum(fragmentContainsJoin.count("VEXCHANGE"))
}
}
}
}
Expand Down
Loading

0 comments on commit ee8f916

Please sign in to comment.