From 40f0bd5255bac0c9d5cc8dc46a584a83ee38300a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 30 Nov 2022 13:42:00 +0800 Subject: [PATCH] executor: add index_merge_intersection cases (#39323) --- executor/index_merge_reader_test.go | 212 ++++++++++++++++++++++++++++ testkit/testutil/require.go | 12 ++ 2 files changed, 224 insertions(+) diff --git a/executor/index_merge_reader_test.go b/executor/index_merge_reader_test.go index 4d719adbdf781..79d2d8b895a81 100644 --- a/executor/index_merge_reader_test.go +++ b/executor/index_merge_reader_test.go @@ -23,7 +23,9 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/testutil" "github.com/pingcap/tidb/util" "github.com/stretchr/testify/require" ) @@ -242,32 +244,52 @@ func TestIndexMergeInTransaction(t *testing.T) { "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", "└─Selection_8(Probe) 1106.67 cop[tikv] lt(test.t1.c3, 10)", " └─TableRowIDScan_7 3330.01 cop[tikv] table:t1 keep order:false, stats:pseudo")) + tk.MustQuery("explain select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where c1 < 10 and c2 < 10 and c3 < 10;").Check(testkit.Rows( + "IndexMerge_9 367.05 root type: intersection", + "├─IndexRangeScan_5(Build) 3323.33 cop[tikv] table:t1, index:c1(c1) range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_6(Build) 3323.33 cop[tikv] table:t1, index:c2(c2) range:[-inf,10), keep order:false, stats:pseudo", + "├─IndexRangeScan_7(Build) 3323.33 cop[tikv] table:t1, index:c3(c3) range:[-inf,10), keep order:false, stats:pseudo", + "└─TableRowIDScan_8(Probe) 367.05 cop[tikv] table:t1 keep order:false, stats:pseudo")) // Test with normal key. tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustExec("insert into t1 values(1, 1, 1, 1);") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) + tk.MustExec("update t1 set c3 = 100 where c3 = 1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) + tk.MustExec("delete from t1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (c1 < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c1, c2, c3) */ * from t1 where (c1 < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) // Test with primary key, so the partialPlan is TableScan. tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < -1 and c2 < 10) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < -1) and c3 < 10;").Check(testkit.Rows()) tk.MustExec("insert into t1 values(1, 1, 1, 1);") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 < 10;").Check(testkit.Rows("1 1 1 1")) tk.MustExec("update t1 set c3 = 100 where c3 = 1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows("1 1 100 1")) tk.MustExec("delete from t1;") tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < -1 or c2 < 10) and c3 < 10;").Check(testkit.Rows()) tk.MustQuery("select /*+ use_index_merge(t1) */ * from t1 where (pk < 10 or c2 < -1) and c3 < 10;").Check(testkit.Rows()) + tk.MustQuery("select /*+ use_index_merge(t1, c2, c3, primary) */ * from t1 where (pk < 10 and c2 < 10) and c3 > 10;").Check(testkit.Rows()) tk.MustExec("commit;") if i == 1 { @@ -566,3 +588,193 @@ func TestPessimisticLockOnPartitionForIndexMerge(t *testing.T) { // TODO: add support for index merge reader in dynamic tidb_partition_prune_mode } + +func TestIndexMergeIntersectionConcurrency(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + // Default is tidb_executor_concurrency. + res = tk.MustQuery("select @@tidb_executor_concurrency;").Sort().Rows() + defExecCon := res[0][0].(string) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", fmt.Sprintf("return(%s)", defExecCon))) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency")) + }() + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + tk.MustExec("set tidb_executor_concurrency = 10") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + // workerCnt = min(part_num, concurrency) + tk.MustExec("set tidb_executor_concurrency = 20") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_executor_concurrency = 2") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(2)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(9)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_index_merge_intersection_concurrency = 21") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(10)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + tk.MustExec("set tidb_index_merge_intersection_concurrency = 3") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(3)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + // Concurrency only works for dynamic pruning partition table, so real concurrency is 1. + tk.MustExec("set tidb_partition_prune_mode = 'static'") + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) + + // Concurrency only works for dynamic pruning partition table. so real concurrency is 1. + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("set tidb_index_merge_intersection_concurrency = 9") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionConcurrency", "return(1)")) + tk.MustQuery("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Check(testkit.Rows("1")) +} + +func TestIntersectionWithDifferentConcurrency(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + var execCon []int + tblSchemas := []string{ + // partition table + "create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;", + // non-partition table + "create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3));", + } + + for tblIdx, tblSchema := range tblSchemas { + if tblIdx == 0 { + // Test different intersectionProcessWorker with partition table(10 partitions). + execCon = []int{1, 3, 10, 11, 20} + } else { + // Default concurrency. + execCon = []int{5} + } + tk.MustExec("use test") + tk.MustExec("drop table if exists t1;") + tk.MustExec(tblSchema) + + const queryCnt int = 10 + const rowCnt int = 1000 + curRowCnt := 0 + insertStr := "insert into t1 values" + for i := 0; i < rowCnt; i++ { + if i != 0 { + insertStr += ", " + } + insertStr += fmt.Sprintf("(%d, %d, %d)", i, rand.Int(), rand.Int()) + curRowCnt++ + } + tk.MustExec(insertStr) + tk.MustExec("analyze table t1") + + for _, concurrency := range execCon { + tk.MustExec(fmt.Sprintf("set tidb_executor_concurrency = %d", concurrency)) + for i := 0; i < 2; i++ { + if i == 0 { + // Dynamic mode. + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + require.Contains(t, res.Rows()[1][0], "IndexMerge") + } else { + tk.MustExec("set tidb_partition_prune_mode = 'static'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + if tblIdx == 0 { + // partition table + require.Contains(t, res.Rows()[1][0], "PartitionUnion") + require.Contains(t, res.Rows()[2][0], "IndexMerge") + } else { + require.Contains(t, res.Rows()[1][0], "IndexMerge") + } + } + for i := 0; i < queryCnt; i++ { + c3 := rand.Intn(1024) + res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows() + tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res) + } + + // In tranaction + for i := 0; i < queryCnt; i++ { + tk.MustExec("begin;") + r := rand.Intn(3) + if r == 0 { + tk.MustExec(fmt.Sprintf("update t1 set c3 = %d where c1 = %d", rand.Int(), rand.Intn(rowCnt))) + } else if r == 1 { + tk.MustExec(fmt.Sprintf("delete from t1 where c1 = %d", rand.Intn(rowCnt))) + } else if r == 2 { + tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", curRowCnt, rand.Int(), rand.Int())) + curRowCnt++ + } + c3 := rand.Intn(1024) + res := tk.MustQuery(fmt.Sprintf("select /*+ no_index_merge() */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Rows() + tk.MustQuery(fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > %d", c3)).Sort().Check(res) + tk.MustExec("commit;") + } + } + } + tk.MustExec("drop table t1") + } +} + +func TestIntersectionWorkerPanic(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(c1 int, c2 bigint, c3 bigint, primary key(c1), key(c2), key(c3)) partition by hash(c1) partitions 10;") + tk.MustExec("insert into t1 values(1, 1, 3000), (2, 1, 1)") + tk.MustExec("analyze table t1;") + tk.MustExec("set tidb_partition_prune_mode = 'dynamic'") + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + // Test panic in intersection. + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic", "panic")) + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c2 < 1024 and c3 > 1024") + require.Contains(t, err.Error(), "IndexMergeReaderExecutor") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeIntersectionWorkerPanic")) +} + +func TestIntersectionMemQuota(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1(pk varchar(100) primary key, c1 int, c2 int, index idx1(c1), index idx2(c2))") + + insertStr := "insert into t1 values" + for i := 0; i < 20; i++ { + if i != 0 { + insertStr += ", " + } + insertStr += fmt.Sprintf("('%s', %d, %d)", testutil.RandStringRunes(100), 1, 1) + } + tk.MustExec(insertStr) + res := tk.MustQuery("explain select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024").Rows() + require.Contains(t, res[1][0], "IndexMerge") + + tk.MustExec("set global tidb_mem_oom_action='CANCEL'") + defer tk.MustExec("set global tidb_mem_oom_action = DEFAULT") + tk.MustExec("set @@tidb_mem_quota_query = 4000") + err := tk.QueryToErr("select /*+ use_index_merge(t1, primary, idx1, idx2) */ c1 from t1 where c1 < 1024 and c2 < 1024") + require.Contains(t, err.Error(), "Out Of Memory Quota!") +} diff --git a/testkit/testutil/require.go b/testkit/testutil/require.go index 90b157fcb7591..09e8e871312ae 100644 --- a/testkit/testutil/require.go +++ b/testkit/testutil/require.go @@ -17,6 +17,7 @@ package testutil import ( + "math/rand" "testing" "github.com/pingcap/tidb/kv" @@ -75,3 +76,14 @@ func CompareUnorderedStringSlice(a []string, b []string) bool { } return len(m) == 0 } + +var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + +// RandStringRunes generate random string of length n. +func RandStringRunes(n int) string { + b := make([]rune, n) + for i := range b { + b[i] = letterRunes[rand.Intn(len(letterRunes))] + } + return string(b) +}