diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index 580acf932b7a4..2fc1f9a90c99c 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -87,7 +87,6 @@ func checkAddPartition(jobCtx *jobContext, job *model.Job) (*model.TableInfo, *m return tblInfo, partInfo, []model.PartitionDefinition{}, nil } -// TODO: Move this into reorganize partition! func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver int64, _ error) { args, err := model.GetTablePartitionArgs(job) if err != nil { @@ -134,6 +133,22 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in return ver, errors.Trace(err) } + failpoint.Inject("addPartCancel1", func(val failpoint.Value) { + if val.(bool) { + job.State = model.JobStateCancelled + err = errors.New("Injected error by addPartCancel1") + failpoint.Return(ver, err) + } + }) + + failpoint.Inject("addPartFail1", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + err = errors.New("Injected error by addPartFail1") + failpoint.Return(ver, err) + } + }) + // move the adding definition into tableInfo. updateAddingPartitionInfo(partInfo, tblInfo) tblInfo.Partition.DDLState = model.StateReplicaOnly @@ -164,6 +179,14 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in return ver, errors.Trace(err) } + failpoint.Inject("addPartCancel2", func(val failpoint.Value) { + if val.(bool) { + job.State = model.JobStateCancelled + err = errors.New("Injected error by addPartCancel2") + failpoint.Return(ver, err) + } + }) + ids := getIDs([]*model.TableInfo{tblInfo}) for _, p := range tblInfo.Partition.AddingDefinitions { ids = append(ids, p.ID) @@ -173,6 +196,14 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in return ver, err } + failpoint.Inject("addPartFail2", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + err = errors.New("Injected error by addPartFail2") + failpoint.Return(ver, err) + } + }) + // none -> replica only job.SchemaState = model.StateReplicaOnly case model.StateReplicaOnly: @@ -219,12 +250,29 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions) + failpoint.Inject("addPartFail3", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + err = errors.New("Injected error by addPartFail3") + failpoint.Return(ver, err) + } + }) + tblInfo.Partition.DDLState = model.StateNone tblInfo.Partition.DDLAction = model.ActionNone ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) } + + failpoint.Inject("addPartFail4", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + err = errors.New("Injected error by addPartFail4") + failpoint.Return(ver, err) + } + }) + addPartitionEvent := notifier.NewAddPartitionEvent(tblInfo, partInfo) err = asyncNotifyEvent(jobCtx, addPartitionEvent, job, noSubJob, w.sess) if err != nil { diff --git a/pkg/ddl/tests/partition/error_injection_test.go b/pkg/ddl/tests/partition/error_injection_test.go index dac8aca75e5bd..c3554a6476f15 100644 --- a/pkg/ddl/tests/partition/error_injection_test.go +++ b/pkg/ddl/tests/partition/error_injection_test.go @@ -15,12 +15,13 @@ package partition import ( + "strings" "testing" - "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/external" + "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/stretchr/testify/require" ) @@ -35,6 +36,11 @@ type FailureTest struct { Tests []InjectedTest } +type TestQuery struct { + Query string + ErrTxt string +} + var truncateTests = FailureTest{ FailpointPrefix: "truncatePart", Tests: []InjectedTest{ @@ -61,27 +67,166 @@ var truncateTests = FailureTest{ }, } +var addPartitionTests = FailureTest{ + FailpointPrefix: "addPart", + Tests: []InjectedTest{ + { + Name: "Cancel1", + Recoverable: false, + Rollback: true, + }, + { + Name: "Cancel2", + Recoverable: false, + Rollback: true, + }, + { + Name: "Fail1", + Recoverable: true, + Rollback: true, + }, + { + Name: "Fail2", + Recoverable: true, + Rollback: false, + }, + { + Name: "Fail3", + Recoverable: true, + Rollback: false, + }, + { + Name: "Fail4", + Recoverable: true, + Rollback: false, + }, + }, +} + +func TestAddPartitionListWithGlobalIndex(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique index (c) global) partition by list(b) ( + partition p0 values in (1,2,3), + partition p1 values in (4,5,6), + partition p2 values in (7,8,9))` + alter := `alter table t add partition (partition p3 values in (10,11,12))` + beforeDML := []TestQuery{ + { + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + "", + }, + { + `update t set a = 7, b = 7, c = 7 where a = 1`, + "", + }, + { + `update t set b = 3, c = 3 where c = 4`, + "", + }, + { + `delete from t where a = 8`, + "", + }, + { + `delete from t where b = 2`, + "", + }, + } + beforeResult := testkit.Rows("4 3 3", "6 6 6", "7 7 7", "9 9 9") + afterDML := []TestQuery{ + { + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + "", + }, + { + `insert into t values (11,11,11)`, + "[table:1526]Table has no partition for value 11", + }, + { + `update t set a = 3, b = 3, c = 3 where a = 4`, + "", + }, + { + `update t set a = 13, b = 13, c = 13 where a = 1`, + "[table:1526]Table has no partition for value 13", + }, + { + // Should not find any row, so OK + `update t set a = 1, b = 1, c = 1 where c = 11`, + "", + }, + { + `update t set a = 4, b = 4, c = 4 where c = 3`, + "", + }, + { + `delete from t where a = 7`, + "", + }, + { + // Should not find any row, so OK + `delete from t where b = 13`, + "", + }, + } + afterResult := testkit.Rows("1 1 1", "4 4 4", "5 5 5", "6 6 6", "8 8 8", "9 7 9") + afterRecover := testkit.Rows("1 1 1", "2 2 2", "8 8 8") + testDDLWithInjectedErrors(t, addPartitionTests, create, alter, beforeDML, beforeResult, afterDML, afterResult, afterRecover) +} + func TestTruncatePartitionListFailuresWithGlobalIndex(t *testing.T) { create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique index (c) global) partition by list(b) ( partition p0 values in (1,2,3), partition p1 values in (4,5,6), partition p2 values in (7,8,9))` alter := `alter table t truncate partition p0,p2` - beforeDML := []string{ - `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, - `update t set a = 7, b = 7, c = 7 where a = 1`, - `update t set b = 3, c = 3 where c = 4`, - `delete from t where a = 8`, - `delete from t where b = 2`, + beforeDML := []TestQuery{ + { + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + "", + }, + { + `update t set a = 7, b = 7, c = 7 where a = 1`, + "", + }, + { + `update t set b = 3, c = 3 where c = 4`, + "", + }, + { + `delete from t where a = 8`, + "", + }, + { + `delete from t where b = 2`, + "", + }, } beforeResult := testkit.Rows("4 3 3", "6 6 6", "7 7 7", "9 9 9") - afterDML := []string{ - `insert into t values (1,1,1),(5,5,5),(8,8,8)`, - `update t set a = 2, b = 2, c = 2 where a = 1`, - `update t set a = 1, b = 1, c = 1 where c = 6`, - `update t set a = 6, b = 6 where a = 9`, - `delete from t where a = 5`, - `delete from t where b = 3`, + afterDML := []TestQuery{ + { + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + "", + }, + { + `update t set a = 2, b = 2, c = 2 where a = 1`, + "", + }, + { + `update t set a = 1, b = 1, c = 1 where c = 6`, + "", + }, + { + `update t set a = 6, b = 6 where a = 9`, + "", + }, + { + `delete from t where a = 5`, + "", + }, + { + `delete from t where b = 3`, + "", + }, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") afterRecover := testkit.Rows("1 1 1", "2 2 2", "8 8 8") @@ -94,28 +239,61 @@ func TestTruncatePartitionListFailures(t *testing.T) { partition p1 values in (4,5,6), partition p2 values in (7,8,9))` alter := `alter table t truncate partition p0,p2` - beforeDML := []string{ - `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, - `update t set a = 7, b = 7, c = 7 where a = 1`, - `update t set b = 3, c = 3, a = 3 where c = 4`, - `delete from t where a = 8`, - `delete from t where b = 2`, + beforeDML := []TestQuery{ + { + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + "", + }, + { + `update t set a = 7, b = 7, c = 7 where a = 1`, + "", + }, + { + `update t set b = 3, c = 3, a = 3 where c = 4`, + "", + }, + { + `delete from t where a = 8`, + "", + }, + { + `delete from t where b = 2`, + "", + }, } beforeResult := testkit.Rows("3 3 3", "6 6 6", "7 7 7", "9 9 9") - afterDML := []string{ - `insert into t values (1,1,1),(5,5,5),(8,8,8)`, - `update t set a = 2, b = 2, c = 2 where a = 1`, - `update t set a = 1, b = 1, c = 1 where c = 6`, - `update t set a = 6, b = 6, c = 6 where a = 9`, - `delete from t where a = 5`, - `delete from t where b = 3`, + afterDML := []TestQuery{ + { + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + "", + }, + { + `update t set a = 2, b = 2, c = 2 where a = 1`, + "", + }, + { + `update t set a = 1, b = 1, c = 1 where c = 6`, + "", + }, + { + `update t set a = 6, b = 6, c = 6 where a = 9`, + "", + }, + { + `delete from t where a = 5`, + "", + }, + { + `delete from t where b = 3`, + "", + }, } afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 6", "7 7 7", "8 8 8") afterRecover := testkit.Rows("1 1 1", "2 2 2", "8 8 8") testDDLWithInjectedErrors(t, truncateTests, create, alter, beforeDML, beforeResult, afterDML, afterResult, afterRecover, "Fail1", "Fail2", "Fail3") } -func testDDLWithInjectedErrors(t *testing.T, tests FailureTest, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterRollback, afterRecover [][]any, skipTests ...string) { +func testDDLWithInjectedErrors(t *testing.T, tests FailureTest, createSQL, alterSQL string, beforeDML []TestQuery, beforeResult [][]any, afterDML []TestQuery, afterRollback, afterRecover [][]any, skipTests ...string) { TEST: for _, test := range tests.Tests { for _, skip := range skipTests { @@ -132,23 +310,23 @@ TEST: } } -func runOneTest(t *testing.T, test InjectedTest, recoverable bool, failpointName, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterResult [][]any) { +func runOneTest(t *testing.T, test InjectedTest, recoverable bool, failpointName, createSQL, alterSQL string, beforeDML []TestQuery, beforeResult [][]any, afterDML []TestQuery, afterResult [][]any) { name := failpointName + test.Name store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - tk.MustExec("set tidb_enable_global_index=true") - defer func() { - tk.MustExec("set tidb_enable_global_index=default") - }() oldWaitTimeWhenErrorOccurred := ddl.WaitTimeWhenErrorOccurred defer func() { ddl.WaitTimeWhenErrorOccurred = oldWaitTimeWhenErrorOccurred }() ddl.WaitTimeWhenErrorOccurred = 0 tk.MustExec(createSQL) - for _, sql := range beforeDML { - tk.MustExec(sql + ` /* ` + name + ` */`) + for _, query := range beforeDML { + if query.ErrTxt != "" { + tk.MustContainErrMsg(query.Query+` /* `+name+` */`, query.ErrTxt) + } else { + tk.MustExec(query.Query + ` /* ` + name + ` */`) + } } tk.MustQuery(`select * from t /* ` + name + ` */`).Sort().Check(beforeResult) tOrg := external.GetTableByName(t, tk, "test", "t") @@ -167,9 +345,9 @@ func runOneTest(t *testing.T, test InjectedTest, recoverable bool, failpointName // test that it should handle recover/retry on error term = "1*return(true)" } - require.NoError(t, failpoint.Enable(fullName, term)) + testfailpoint.Enable(t, fullName, term) err := tk.ExecToErr(alterSQL + " /* " + name + " */") - require.NoError(t, failpoint.Disable(fullName)) + testfailpoint.Disable(t, fullName) tt := external.GetTableByName(t, tk, "test", "t") pi := tt.Meta().Partition if recoverable { @@ -178,7 +356,14 @@ func runOneTest(t *testing.T, test InjectedTest, recoverable bool, failpointName for i, pid := range pids { equal = equal && pid == pi.Definitions[i].ID } - require.False(t, equal, name) + if strings.HasPrefix(failpointName, "truncatePart") { + require.False(t, equal, name) + } else if strings.HasPrefix(failpointName, "addPart") { + require.True(t, equal, name) + require.Less(t, len(pids), len(pi.Definitions)) + } else { + require.Fail(t, "Unknown failpoint test", "failpointName: %s", failpointName) + } return } require.Error(t, err, "failpoint "+name) @@ -197,8 +382,12 @@ func runOneTest(t *testing.T, test InjectedTest, recoverable bool, failpointName } tk.MustExec(`admin check table t /* ` + name + ` */`) tk.MustExec(`update t set b = 7 where a = 9 /* ` + name + ` */`) - for _, sql := range afterDML { - tk.MustExec(sql + " /* " + name + " */") + for _, query := range afterDML { + if query.ErrTxt != "" { + tk.MustContainErrMsg(query.Query+` /* `+name+` */`, query.ErrTxt) + } else { + tk.MustExec(query.Query + ` /* ` + name + ` */`) + } } tk.MustQuery(`select * from t /* ` + name + ` */`).Sort().Check(afterResult) tk.MustExec(`drop table t /* ` + name + ` */`) diff --git a/pkg/ddl/tests/partition/multi_domain_test.go b/pkg/ddl/tests/partition/multi_domain_test.go index e435bc7cc8708..7cf7b16c11aa9 100644 --- a/pkg/ddl/tests/partition/multi_domain_test.go +++ b/pkg/ddl/tests/partition/multi_domain_test.go @@ -703,112 +703,167 @@ func TestMultiSchemaDropUniqueIndex(t *testing.T) { runMultiSchemaTest(t, createSQL, alterSQL, initFn, nil, loopFn) } -// TODO: Also add test for REMOVE PARTITIONING! -///* -//// TODO: complete this test, so that we test all four changes: -//1 unique non-global - to become global -//2 unique global - to become non-global -//3 unique non-global - to stay non-global -//4 unique global - to stay global -//func TestMultiSchemaPartitionByGlobalIndex(t *testing.T) { -// createSQL := `create table t (a int primary key, b varchar(255), c bigint, unique index idx_b_global (b) global, unique key idx_b (b), unique key idx_c_global (c), unique key idx_c (c)) partition by key (a,b) partitions 3` -// initFn := func(tkO *testkit.TestKit) { -// tkO.MustExec(`insert into t values (1,1),(2,2),(101,101),(102,102)`) -// } -// alterSQL := `alter table t partition by key (b,a) partitions 5` -// loopFn := func(tkO, tkNO *testkit.TestKit) { -// res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) -// schemaState := res.Rows()[0][0].(string) -// switch schemaState { -// case model.StateDeleteOnly.String(): -// // tkNO sees original table/partitions as before the DDL stated -// // tkO uses the original table/partitions, but should also delete from the newly created -// // Global Index, to replace the existing one. -// tkO.MustContainErrMsg(`insert into t values (1,2)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b'") -// tkNO.MustContainErrMsg(`insert into t values (1,2)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b'") -// tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b'") -// tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b'") -// tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "101 101", "102 102", "2 2")) -// tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 1", "101 101", "102 102", "2 2")) -// tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 1", "101 101", "102 102", "2 2")) -// tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 1")) -// tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 1", "2 2")) -// tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 1", "2 2")) -// tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 1", "2 2")) -// -// tkNO.MustQuery(`select * from t where b = 2`).Sort().Check(testkit.Rows("2 2")) -// tkO.MustExec(`insert into t values (3,3)`) -// tkNO.MustExec(`insert into t values (4,4)`) -// tkNO.MustQuery(`select * from t where a = 3`).Sort().Check(testkit.Rows("3 3")) -// tkO.MustQuery(`select * from t where a = 4`).Sort().Check(testkit.Rows("4 4")) -// case model.StateWriteOnly.String(): -// // Both tkO and tkNO uses the original table/partitions, -// // but tkO should also update the newly created -// // Global Index, and tkNO should only delete from it. -// /* -// tkO.MustContainErrMsg(`insert into t values (1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b'") -// tkNO.MustContainErrMsg(`insert into t values (1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b'") -// tkO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b'") -// tkNO.MustContainErrMsg(`insert into t values (101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b'") -// tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "101 101", "102 102", "2 2", "3 3", "4 4")) -// tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1", "101 101", "102 102", "2 2", "3 3", "4 4")) -// -// */ -// logutil.BgLogger().Info("insert into t values (5,5)") -// tkO.MustExec(`insert into t values (5,5)`) -// tkNO.MustExec(`insert into t values (6,6)`) -// tkNO.MustQuery(`select * from t where a = 5`).Sort().Check(testkit.Rows("5 5")) -// tkO.MustQuery(`select * from t where a = 6`).Sort().Check(testkit.Rows("6 6")) -// case model.StateWriteReorganization.String(): -// // Both tkO and tkNO uses the original table/partitions, -// // and should also update the newly created Global Index. -// tkO.MustExec(`insert into t values (7,7)`) -// tkNO.MustExec(`insert into t values (8,8)`) -// tkNO.MustQuery(`select * from t where b = 7`).Check(testkit.Rows("7 7")) -// tkO.MustQuery(`select * from t where b = 8`).Check(testkit.Rows("8 8")) -// case model.StateDeleteReorganization.String(): -// // Both tkO now sees the new partitions, and should use the new Global Index, -// // plus double write to the old one. -// // tkNO uses the original table/partitions, -// // and should also update the newly created Global Index. -// tkO.MustExec(`insert into t values (9,9)`) -// tkNO.MustExec(`insert into t values (10,10)`) -// tkNO.MustQuery(`select * from t where b = 9`).Check(testkit.Rows("9 9")) -// tkO.MustQuery(`select * from t where b = 10`).Check(testkit.Rows("10 10")) -// // TODO: Test update and delete! -// // TODO: test key, hash and list partition without default partition :) -// tkNO.MustQuery(`show create table t`).Check(testkit.Rows("" + -// "t CREATE TABLE `t` (\n" + -// " `a` int(11) NOT NULL,\n" + -// " `b` varchar(255) DEFAULT NULL,\n" + -// " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + -// " UNIQUE KEY idx_b (`b`) /*T![global_index] GLOBAL */\n" + -// ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + -// "PARTITION BY RANGE (`a`)\n" + -// "(PARTITION `p1` VALUES LESS THAN (200))")) -// tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + -// "t CREATE TABLE `t` (\n" + -// " `a` int(11) NOT NULL,\n" + -// " `b` varchar(255) DEFAULT NULL,\n" + -// " PRIMARY KEY (`a`) /*T![clustered_index] CLUSTERED */,\n" + -// " UNIQUE KEY idx_b (`b`) /*T![global_index] GLOBAL */\n" + -// ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + -// "PARTITION BY RANGE (`a`)\n" + -// "(PARTITION `p0` VALUES LESS THAN (100),\n" + -// " PARTITION `p1` VALUES LESS THAN (200))")) -// tkO.MustExec(`insert into t values (3,3)`) -// case model.StateNone.String(): -// // just to not fail :) -// default: -// require.Failf(t, "unhandled schema state '%s'", schemaState) -// } -// } -// postFn := func(tkO *testkit.TestKit) { -// tkO.MustQuery(`select * from t where b = 5`).Sort().Check(testkit.Rows("5 5")) -// tkO.MustExec(`admin check table t`) -// } -// runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) -//} +// 1 unique non-global - to become global +// 2 unique global - to become non-global +// 3 unique non-global - to stay non-global +// 4 unique global - to stay global +func TestMultiSchemaRemovePartitioningGlobalIndex(t *testing.T) { + createSQL := `create table t (a int primary key nonclustered global, b varchar(255), c bigint, unique index idx_b_global (b) global, unique key idx_ba (b,a), unique key idx_ab (a,b) global, unique key idx_c_global (c) global, unique key idx_cab (c,a,b)) partition by key (a,b) partitions 3` + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(`insert into t values (1,1,1),(2,2,2),(101,101,101),(102,102,102)`) + } + alterSQL := `alter table t remove partitioning` + doneStateWriteReorganize := false + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + switch schemaState { + case model.StateDeleteOnly.String(): + // tkNO sees original table/partitions as before the DDL stated + // tkO uses the original table/partitions, but should also delete from the newly created + // Global Index, to replace the existing one. + tkO.MustContainErrMsg(`insert into t values (1,2,3)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (1,2,3)`, "[kv:1062]Duplicate entry '2' for key 't.idx_b") + tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2")) + tkNO.MustQuery(`select * from t where a < 1000`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2")) + tkNO.MustQuery(`select * from t where a > 0`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2")) + tkNO.MustQuery(`select * from t where a = 1`).Sort().Check(testkit.Rows("1 1 1")) + tkNO.MustQuery(`select * from t where a = 1 or a = 2 or a = 3`).Sort().Check(testkit.Rows("1 1 1", "2 2 2")) + tkNO.MustQuery(`select * from t where a in (1,2,3)`).Sort().Check(testkit.Rows("1 1 1", "2 2 2")) + tkNO.MustQuery(`select * from t where a < 100`).Sort().Check(testkit.Rows("1 1 1", "2 2 2")) + + tkNO.MustQuery(`select * from t where b = 2`).Sort().Check(testkit.Rows("2 2 2")) + tkO.MustExec(`insert into t values (3,3,3)`) + tkNO.MustExec(`insert into t values (4,4,4)`) + tkNO.MustQuery(`select * from t where a = 3`).Sort().Check(testkit.Rows("3 3 3")) + tkO.MustQuery(`select * from t where a = 4`).Sort().Check(testkit.Rows("4 4 4")) + case model.StateWriteOnly.String(): + // Both tkO and tkNO uses the original table/partitions, + // but tkO should also update the newly created + // Global Index, and tkNO should only delete from it. + tkO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (1,1,1)`, "[kv:1062]Duplicate entry '1' for key 't.idx_b") + tkO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustContainErrMsg(`insert into t values (101,101,101)`, "[kv:1062]Duplicate entry '101' for key 't.idx_b") + tkNO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2", "3 3 3", "4 4 4")) + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows("1 1 1", "101 101 101", "102 102 102", "2 2 2", "3 3 3", "4 4 4")) + logutil.BgLogger().Info("insert into t values (5,5,5)") + tkO.MustExec(`insert into t values (5,5,5)`) + tkNO.MustExec(`insert into t values (6,6,6)`) + tkNO.MustQuery(`select * from t where a = 5`).Sort().Check(testkit.Rows("5 5 5")) + tkO.MustQuery(`select * from t where a = 6`).Sort().Check(testkit.Rows("6 6 6")) + case model.StateWriteReorganization.String(): + // It will go through StateWriteReorg more than once. + if doneStateWriteReorganize { + break + } + doneStateWriteReorganize = true + // Both tkO and tkNO uses the original table/partitions, + // and should also update the newly created Global Index. + tkO.MustExec(`insert into t values (7,7,7)`) + tkNO.MustExec(`insert into t values (8,8,8)`) + tkNO.MustQuery(`select * from t where b = 7`).Check(testkit.Rows("7 7 7")) + tkO.MustQuery(`select * from t where b = 8`).Check(testkit.Rows("8 8 8")) + case model.StateDeleteReorganization.String(): + // Both tkO now sees the new partitions, and should use the new Global Index, + // plus double write to the old one. + // tkNO uses the original table/partitions, + // and should also update the newly created Global Index. + tkO.MustExec(`insert into t values (9,9,9)`) + tkNO.MustExec(`insert into t values (10,10,10)`) + tkNO.MustQuery(`select * from t where b = 9`).Check(testkit.Rows("9 9 9")) + tkO.MustQuery(`select * from t where b = 10`).Check(testkit.Rows("10 10 10")) + // TODO: Test update and delete! + // TODO: test key, hash and list partition without default partition :) + tkNO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` bigint(20) DEFAULT NULL,\n" + + " UNIQUE KEY `idx_b_global` (`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_ba` (`b`,`a`),\n" + + " UNIQUE KEY `idx_ab` (`a`,`b`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_c_global` (`c`) /*T![global_index] GLOBAL */,\n" + + " UNIQUE KEY `idx_cab` (`c`,`a`,`b`),\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */ /*T![global_index] GLOBAL */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY KEY (`a`,`b`) PARTITIONS 3")) + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` bigint(20) DEFAULT NULL,\n" + + " UNIQUE KEY `idx_ba` (`b`,`a`),\n" + + " UNIQUE KEY `idx_cab` (`c`,`a`,`b`),\n" + + " UNIQUE KEY `idx_b_global` (`b`),\n" + + " UNIQUE KEY `idx_ab` (`a`,`b`),\n" + + " UNIQUE KEY `idx_c_global` (`c`),\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY NONE COLUMNS(`a`,`b`)\n" + + "(PARTITION `CollapsedPartitions`)")) + case model.StatePublic.String(): + tkO.MustExec(`insert into t values (11,11,11)`) + tkNO.MustExec(`insert into t values (12,12,12)`) + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` bigint(20) DEFAULT NULL,\n" + + " UNIQUE KEY `idx_ba` (`b`,`a`),\n" + + " UNIQUE KEY `idx_cab` (`c`,`a`,`b`),\n" + + " UNIQUE KEY `idx_b_global` (`b`),\n" + + " UNIQUE KEY `idx_ab` (`a`,`b`),\n" + + " UNIQUE KEY `idx_c_global` (`c`),\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" + + "PARTITION BY NONE COLUMNS(`a`,`b`)\n" + + "(PARTITION `CollapsedPartitions`)")) + case model.StateNone.String(): + tkO.MustExec(`insert into t values (13,13,13)`) + tkNO.MustExec(`insert into t values (14,14,14)`) + tkO.MustQuery(`select * from t where b = 11`).Check(testkit.Rows("11 11 11")) + tkO.MustQuery(`show create table t`).Check(testkit.Rows("" + + "t CREATE TABLE `t` (\n" + + " `a` int(11) NOT NULL,\n" + + " `b` varchar(255) DEFAULT NULL,\n" + + " `c` bigint(20) DEFAULT NULL,\n" + + " UNIQUE KEY `idx_ba` (`b`,`a`),\n" + + " UNIQUE KEY `idx_cab` (`c`,`a`,`b`),\n" + + " UNIQUE KEY `idx_b_global` (`b`),\n" + + " UNIQUE KEY `idx_ab` (`a`,`b`),\n" + + " UNIQUE KEY `idx_c_global` (`c`),\n" + + " PRIMARY KEY (`a`) /*T![clustered_index] NONCLUSTERED */\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + default: + require.Failf(t, "unhandled schema state '%s'", schemaState) + } + } + postFn := func(tkO *testkit.TestKit, _ kv.Storage) { + tkO.MustQuery(`select * from t where b = 5`).Check(testkit.Rows("5 5 5")) + tkO.MustExec(`admin check table t`) + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "1 1 1", + "10 10 10", + "101 101 101", + "102 102 102", + "11 11 11", + "12 12 12", + "13 13 13", + "14 14 14", + "2 2 2", + "3 3 3", + "4 4 4", + "5 5 5", + "6 6 6", + "7 7 7", + "8 8 8", + "9 9 9")) + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) +} func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn func(*testkit.TestKit), postFn func(*testkit.TestKit, kv.Storage), loopFn func(tO, tNO *testkit.TestKit)) { // When debugging, increase the lease, so the schema does not auto reload :) @@ -973,9 +1028,11 @@ LocalLoop: } PartitionLoop: for _, partID := range originalPartitions { - for _, def := range tbl.Meta().Partition.Definitions { - if def.ID == partID { - continue PartitionLoop + if tbl.Meta().Partition != nil { + for _, def := range tbl.Meta().Partition.Definitions { + if def.ID == partID { + continue PartitionLoop + } } } // old partitions removed @@ -1443,3 +1500,56 @@ func TestMultiSchemaTruncatePartitionWithPKGlobal(t *testing.T) { } runMultiSchemaTest(t, createSQL, alterSQL, initFn, nil, loopFn) } +func TestMultiSchemaAddPartitionWithGlobalIndex(t *testing.T) { + createSQL := `create table t (a int, b int, c varchar(255) default 'Filler', primary key (a) nonclustered global, unique key uk_b (b)) partition by range (b) (partition p0 values less than (100), partition p1 values less than (200))` + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(`insert into t (a,b) values (0,0),(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(100,100),(101,101),(102,102),(103,103),(104,104)`) + } + alterSQL := `alter table t add partition (partition p2 values less than (300))` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + logutil.BgLogger().Info("loopFn", zap.String("schemaState", schemaState)) + switch schemaState { + case "replica only": + tkO.MustContainErrMsg(`insert into t values (200,200,200)`, "[table:1526]Table has no partition for value 200") + tkNO.MustContainErrMsg(`insert into t values (200,200,200)`, "[table:1526]Table has no partition for value 200") + tkO.MustContainErrMsg(`update t set b = 200 where a = 100`, "[table:1526]Table has no partition for value 200") + tkNO.MustContainErrMsg(`update t set b = 200 where a = 100`, "[table:1526]Table has no partition for value 200") + case "public": + tkO.MustExec(`insert into t values (200,200,200),(201,201,201),(202,202,202),(203,203,203)`) + tkNO.MustContainErrMsg(`insert into t values (204,204,204)`, "[table:1526]Table has no partition for value 204") + tkO.MustExec(`update t set b = 204 where a = 104`) + tkNO.MustContainErrMsg(`update t set b = 205 where a = 100`, "[table:1526]Table has no partition for value 205") + tkO.MustExec(`delete from t where a = 201`) + // Should not find the row! + tkNO.MustExec(`delete from t where a = 202`) + tkO.MustQuery(`select * from t where b = 202`).Check(testkit.Rows("202 202 202")) + tkO.MustQuery(`select * from t where a = 202`).Check(testkit.Rows("202 202 202")) + tkNO.MustQuery(`select * from t where b = 202`).Check(testkit.Rows()) + tkNO.MustQuery(`select * from t where a = 202`).Check(testkit.Rows()) + default: + require.Fail(t, "Unhandled schema state", "State: '%s'", schemaState) + } + } + postFn := func(tkO *testkit.TestKit, _ kv.Storage) { + tkO.MustQuery(`select * from t`).Sort().Check(testkit.Rows(""+ + "0 0 Filler", + "1 1 Filler", + "100 100 Filler", + "101 101 Filler", + "102 102 Filler", + "103 103 Filler", + "104 204 Filler", + "2 2 Filler", + "200 200 200", + "202 202 202", + "203 203 203", + "3 3 Filler", + "4 4 Filler", + "5 5 Filler", + "6 6 Filler", + "7 7 Filler")) + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn) +}