From 3157fc3936c40256683c6ffcf9d9b6c11d497179 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 13 Dec 2024 12:37:13 +0800 Subject: [PATCH] *: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114) (#58124) ref pingcap/tidb#45133, close pingcap/tidb#56822, close pingcap/tidb#57510 --- pkg/ddl/partition.go | 180 +++++++------ pkg/ddl/tests/partition/BUILD.bazel | 3 +- pkg/ddl/tests/partition/db_partition_test.go | 32 +-- pkg/ddl/tests/partition/multi_domain_test.go | 245 ++++++++++++++++++ pkg/table/tables/partition.go | 8 +- pkg/testkit/mockstore.go | 7 +- .../integrationtest/r/ddl/db_partition.result | 4 +- 7 files changed, 362 insertions(+), 117 deletions(-) create mode 100644 pkg/ddl/tests/partition/multi_domain_test.go diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index f0f13b7e39236..ff2e8bf46cc64 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -44,7 +44,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/opcode" - "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/table" @@ -3143,6 +3142,12 @@ type reorgPartitionWorker struct { writeColOffsetMap map[int64]int maxOffset int reorgedTbl table.PartitionedTable + // Only used for non-clustered tables, since we need to re-generate _tidb_rowid, + // and check if the old _tidb_rowid was already written or not. + // If the old _tidb_rowid already exists, then the row is already backfilled (double written) + // and can be skipped. Otherwise, we will insert it with a new _tidb_rowid. + // The original _tidb_rowids, used to check if already backfilled (double written). + oldKeys []kv.Key } func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) (*reorgPartitionWorker, error) { @@ -3190,43 +3195,79 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task } txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName) - rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) + nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange) if err != nil { return errors.Trace(err) } taskCtx.nextKey = nextKey taskCtx.done = taskDone - warningsMap := make(map[errors.ErrorID]*terror.Error) - warningsCountMap := make(map[errors.ErrorID]int64) - for _, prr := range rowRecords { - taskCtx.scanCount++ - - err = txn.Set(prr.key, prr.vals) + var found map[string][]byte + // If non-clustered table, then we need to replace the _tidb_rowid handles since + // there may be duplicates across different partitions, due to EXCHANGE PARTITION. + // Meaning we need to check here if a record was double written to the new partition, + // i.e. concurrently written by StateWriteOnly or StateWriteReorganization. + // If so, then we must skip it. + if len(w.oldKeys) > 0 { + // If we skip checking, then we will duplicate that double written row, with a new _tidb_rowid. + found, err = txn.BatchGet(ctx, w.oldKeys) if err != nil { return errors.Trace(err) } - taskCtx.addedCount++ - if prr.warning != nil { - if _, ok := warningsCountMap[prr.warning.ID()]; ok { - warningsCountMap[prr.warning.ID()]++ - } else { - warningsCountMap[prr.warning.ID()] = 1 - warningsMap[prr.warning.ID()] = prr.warning - } - } - // TODO: Future optimization: also write the indexes here? - // What if the transaction limit is just enough for a single row, without index? - // Hmm, how could that be in the first place? - // For now, implement the batch-txn w.addTableIndex, - // since it already exists and is in use + // TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel? } - // Collect the warnings. - taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap + failpoint.Call("github.com/pingcap/tidb/pkg/ddl/PartitionBackfillData", len(w.rowRecords) > 0) + for i, prr := range w.rowRecords { + taskCtx.scanCount++ + key := prr.key + if len(w.oldKeys) > 0 { + if _, ok := found[string(w.oldKeys[i])]; ok { + // Already filled, i.e. double written by concurrent DML. + continue + } - // also add the index entries here? And make sure they are not added somewhere else + // Pretend/Check if we can write the old key, + // since there can still be a concurrent update/insert happening that would + // cause a duplicate. + err = txn.Set(w.oldKeys[i], prr.vals) + if err != nil { + return errors.Trace(err) + } + err = txn.SetAssertion(w.oldKeys[i], kv.SetAssertNotExist) + if err != nil { + return errors.Trace(err) + } + // Don't actually write it, just make sure this transaction would + // fail if another transaction writes the same key before us. + err = txn.Delete(w.oldKeys[i]) + if err != nil { + return errors.Trace(err) + } + // Generate new _tidb_rowid. + stmtCtx := w.sessCtx.GetSessionVars().StmtCtx + if stmtCtx.BaseRowID >= stmtCtx.MaxRowID { + ids := uint64(max(1, w.batchCnt-len(w.rowRecords))) + // Keep using the original table's allocator + stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids) + if err != nil { + return errors.Trace(err) + } + } + recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl) + if err != nil { + return errors.Trace(err) + } + // tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2 + key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], recordID) + } + err = txn.Set(key, prr.vals) + if err != nil { + return errors.Trace(err) + } + taskCtx.addedCount++ + } return nil }) logSlowOperations(time.Since(oprStartTime), "BackfillData", 3000) @@ -3234,10 +3275,12 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task return } -func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) { +func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) { w.rowRecords = w.rowRecords[:0] + w.oldKeys = w.oldKeys[:0] startTime := time.Now() + isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle // taskDone means that the added handle is out of taskRange.endHandle. taskDone := false sysTZ := w.sessCtx.GetSessionVars().StmtCtx.TimeZone() @@ -3257,8 +3300,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo return false, nil } - // TODO: Extend for normal tables - // TODO: Extend for REMOVE PARTITIONING _, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, sysTZ, w.rowMap) if err != nil { return false, errors.Trace(err) @@ -3276,34 +3317,14 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo if err != nil { return false, errors.Trace(err) } - var newKey kv.Key - if w.reorgedTbl.Meta().PKIsHandle || w.reorgedTbl.Meta().IsCommonHandle { - pid := p.GetPhysicalID() - newKey = tablecodec.EncodeTablePrefix(pid) - newKey = append(newKey, recordKey[len(newKey):]...) - } else { - // Non-clustered table / not unique _tidb_rowid for the whole table - // Generate new _tidb_rowid if exists. - // Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions! - stmtCtx := w.sessCtx.GetSessionVars().StmtCtx - if stmtCtx.BaseRowID >= stmtCtx.MaxRowID { - // TODO: Which autoid allocator to use? - ids := uint64(max(1, w.batchCnt-len(w.rowRecords))) - // Keep using the original table's allocator - stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids) - if err != nil { - return false, errors.Trace(err) - } - } - recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl) - if err != nil { - return false, errors.Trace(err) - } - newKey = tablecodec.EncodeRecordKey(p.RecordPrefix(), recordID) + newKey := tablecodec.EncodeTablePrefix(p.GetPhysicalID()) + newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...) + w.rowRecords = append(w.rowRecords, &rowRecord{key: newKey, vals: rawRow}) + if !isClustered { + oldKey := newKey[:tablecodec.TableSplitKeyLen] + oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...) + w.oldKeys = append(w.oldKeys, oldKey) } - w.rowRecords = append(w.rowRecords, &rowRecord{ - key: newKey, vals: rawRow, - }) w.cleanRowMap() lastAccessedHandle = recordKey @@ -3318,8 +3339,8 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo taskDone = true } - logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime))) - return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err) + logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)), zap.Error(err)) + return getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err) } func (w *reorgPartitionWorker) cleanRowMap() { @@ -3341,10 +3362,9 @@ func (w *reorgPartitionWorker) GetCtx() *backfillCtx { } func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) error { - // First copy all table data to the new partitions + // First copy all table data to the new AddingDefinitions partitions // from each of the DroppingDefinitions partitions. - // Then create all indexes on the AddingDefinitions partitions - // for each new index, one partition at a time. + // Then create all indexes on the AddingDefinitions partitions, // Copy the data from the DroppingDefinitions to the AddingDefinitions if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { @@ -3352,6 +3372,10 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) if err != nil { return errors.Trace(err) } + if len(reorgInfo.elements) <= 1 { + // No indexes to (re)create, all done! + return nil + } } failpoint.Inject("reorgPartitionAfterDataCopy", func(val failpoint.Value) { @@ -3361,32 +3385,11 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) } }) - // Rewrite this to do all indexes at once in addTableIndex - // instead of calling it once per index (meaning reading the table multiple times) - // But for now, try to understand how it works... - firstNewPartitionID := t.Meta().Partition.AddingDefinitions[0].ID - startElementOffset := 0 - //startElementOffsetToResetHandle := -1 - // This backfill job starts with backfilling index data, whose index ID is currElement.ID. if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) { - // First run, have not yet started backfilling index data - // Restart with the first new partition. - // TODO: handle remove partitioning - reorgInfo.PhysicalTableID = firstNewPartitionID - } else { - // The job was interrupted and has been restarted, - // reset and start from where it was done - for i, element := range reorgInfo.elements[1:] { - if reorgInfo.currElement.ID == element.ID { - startElementOffset = i - //startElementOffsetToResetHandle = i - break - } - } - } - - for i := startElementOffset; i < len(reorgInfo.elements[1:]); i++ { - // Now build the indexes in the new partitions + // row data has been copied, now proceed with creating the indexes + // on the new AddingDefinitions partitions + reorgInfo.PhysicalTableID = t.Meta().Partition.AddingDefinitions[0].ID + reorgInfo.currElement = reorgInfo.elements[1] var physTbl table.PhysicalTable if tbl, ok := t.(table.PartitionedTable); ok { physTbl = tbl.GetPartition(reorgInfo.PhysicalTableID) @@ -3399,10 +3402,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) if err != nil { return errors.Trace(err) } - // TODO: Can we improve this in case of a crash? - // like where the regInfo PhysicalTableID and element is the same, - // and the tableid in the key-prefix regInfo.StartKey and regInfo.EndKey matches with PhysicalTableID - // do not change the reorgInfo start/end key startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) @@ -3411,8 +3410,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) // Always (re)start with the full PhysicalTable range reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle - // Update the element in the reorgInfo for updating the reorg meta below. - reorgInfo.currElement = reorgInfo.elements[i+1] // Write the reorg info to store so the whole reorganize process can recover from panic. err = reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool) logutil.BgLogger().Info("update column and indexes", zap.String("category", "ddl"), @@ -3429,7 +3426,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) if err != nil { return errors.Trace(err) } - reorgInfo.PhysicalTableID = firstNewPartitionID } failpoint.Inject("reorgPartitionAfterIndex", func(val failpoint.Value) { //nolint:forcetypeassert diff --git a/pkg/ddl/tests/partition/BUILD.bazel b/pkg/ddl/tests/partition/BUILD.bazel index 1183232ee444d..c52787b88150d 100644 --- a/pkg/ddl/tests/partition/BUILD.bazel +++ b/pkg/ddl/tests/partition/BUILD.bazel @@ -6,9 +6,10 @@ go_test( srcs = [ "db_partition_test.go", "main_test.go", + "multi_domain_test.go", ], flaky = True, - shard_count = 49, + shard_count = 50, deps = [ "//pkg/config", "//pkg/ddl", diff --git a/pkg/ddl/tests/partition/db_partition_test.go b/pkg/ddl/tests/partition/db_partition_test.go index 197c7667ec7fb..4f98e1cc7efc5 100644 --- a/pkg/ddl/tests/partition/db_partition_test.go +++ b/pkg/ddl/tests/partition/db_partition_test.go @@ -3608,10 +3608,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) { tk3.MustExec(`COMMIT`) tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9")) + "19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9")) tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "23 22 6", "27 26 8", "32 31 10")) + "19 18 4", "23 22 6", "27 26 8", "31 30 10")) waitFor(4, "t", "write reorganization") tk3.MustExec(`BEGIN`) @@ -3621,28 +3621,20 @@ func TestRemovePartitioningAutoIDs(t *testing.T) { tk3.MustExec(`insert into t values (null, 23)`) tk2.MustExec(`COMMIT`) - /* - waitFor(4, "t", "delete reorganization") - tk2.MustExec(`BEGIN`) - tk2.MustExec(`insert into t values (null, 24)`) + waitFor(4, "t", "delete reorganization") + tk2.MustExec(`BEGIN`) + tk2.MustExec(`insert into t values (null, 24)`) - tk3.MustExec(`insert into t values (null, 25)`) - tk2.MustExec(`insert into t values (null, 26)`) - */ + tk3.MustExec(`insert into t values (null, 25)`) + tk2.MustExec(`insert into t values (null, 26)`) tk3.MustExec(`COMMIT`) tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( - "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9", - "32 31 10", "35 34 21", "38 37 22", "41 40 23")) - - //waitFor(4, "t", "public") - //tk2.MustExec(`commit`) - // TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904 - require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'") + "27 26 8", "30012 12 12", "30013 18 4", "30014 24 7", "30264 16 18", "30265 22 6", "30266 28 9", "30516 11 11", "30517 2 2", "30518 20 5", "31 30 10", "33 32 21", "35 34 22", "37 36 23", "41 40 25")) + waitFor(4, "t", "none") + tk2.MustExec(`commit`) + require.NoError(t, <-alterChan) tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows( - "13 11 11", "14 2 2", "15 12 12", "17 16 18", - "19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9", - "32 31 10", "35 34 21", "38 37 22", "41 40 23")) + "27 26 8", "30012 12 12", "30013 18 4", "30014 24 7", "30264 16 18", "30265 22 6", "30266 28 9", "30516 11 11", "30517 2 2", "30518 20 5", "31 30 10", "33 32 21", "35 34 22", "37 36 23", "39 38 24", "41 40 25", "43 42 26")) } func TestAlterLastIntervalPartition(t *testing.T) { diff --git a/pkg/ddl/tests/partition/multi_domain_test.go b/pkg/ddl/tests/partition/multi_domain_test.go new file mode 100644 index 0000000000000..b7d31fc84652a --- /dev/null +++ b/pkg/ddl/tests/partition/multi_domain_test.go @@ -0,0 +1,245 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package partition + +import ( + "fmt" + "testing" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl/util/callback" + "github.com/pingcap/tidb/pkg/parser/model" + "github.com/pingcap/tidb/pkg/session" + "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/util/logutil" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestMultiSchemaReorganizePK(t *testing.T) { + createSQL := `create table t (c1 INT primary key, c2 CHAR(255), c3 CHAR(255), c4 CHAR(255), c5 CHAR(255)) partition by range (c1) (partition p1 values less than (200), partition pMax values less than (maxvalue))` + i := 1 + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',%d,%d)`, i, "init O", 4185725186-i, 7483634197-i)) + i++ + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',%d,%d)`, i, "init O", 4185725186-i, 7483634197-i)) + i++ + } + alterSQL := `alter table t reorganize partition p1 into (partition p0 values less than (100), partition p1 values less than (200))` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',%d,%d)`, i, schemaState+" O", 4185725186-i, 7483634197-i)) + i++ + tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',%d,%d)`, i, schemaState+" NO", 4185725186-i, 7483634197-i)) + i++ + } + postFn := func(tkO *testkit.TestKit) { + require.Equal(t, int(6*2+1), i) + tkO.MustQuery(`select c1,c2 from t`).Sort().Check(testkit.Rows(""+ + "1 init O", + "10 delete reorganization NO", + "11 none O", + "12 none NO", + "2 init O", + "3 delete only O", + "4 delete only NO", + "5 write only O", + "6 write only NO", + "7 write reorganization O", + "8 write reorganization NO", + "9 delete reorganization O")) + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn, false) + i = 1 + postFn = func(tkO *testkit.TestKit) { + tkO.MustQuery(`select c1,c2,c3 from t`).Sort().Check(testkit.Rows(""+ + "1 init O updated", + "10 delete reorganization NO Original", + "11 none O Original", + "12 none NO Original", + "2 init O updated", + "3 delete only O updated", + "4 delete only NO updated", + "5 write only O updated", + "6 write only NO updated", + "7 write reorganization O Original", + "8 write reorganization NO Original", + "9 delete reorganization O Original")) + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn, true) +} + +func TestMultiSchemaReorganizeNoPK(t *testing.T) { + createSQL := `create table t (c1 INT, c2 CHAR(255), c3 CHAR(255), c4 CHAR(255), c5 CHAR(255)) partition by range (c1) (partition p1 values less than (200), partition pMax values less than (maxvalue))` + i := 1 + initFn := func(tkO *testkit.TestKit) { + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',repeat('%d', 25),repeat('%d', 25))`, i, "init O", 4185725186-i, 7483634197-i)) + i++ + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',repeat('%d', 25),repeat('%d', 25))`, i, "init O", 4185725186-i, 7483634197-i)) + i++ + } + alterSQL := `alter table t reorganize partition p1 into (partition p0 values less than (100), partition p1 values less than (200))` + loopFn := func(tkO, tkNO *testkit.TestKit) { + res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`) + schemaState := res.Rows()[0][0].(string) + tkO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',repeat('%d', 25),repeat('%d', 25))`, i, schemaState+" O", 4185725186-i, 7483634197-i)) + i++ + tkNO.MustExec(fmt.Sprintf(`insert into t values (%d,'%s','Original',repeat('%d', 25),repeat('%d', 25))`, i, schemaState+" NO", 4185725186-i, 7483634197-i)) + i++ + } + postFn := func(tkO *testkit.TestKit) { + require.Equal(t, int(6*2+1), i) + tkO.MustQuery(`select c1,_tidb_rowid,c2 from t`).Sort().Check(testkit.Rows(""+ + "1 60001 init O", + "10 30004 delete reorganization NO", + "11 7 none O", + "12 30005 none NO", + "2 60002 init O", + "3 60003 delete only O", + "4 60004 delete only NO", + "5 4 write only O", + // Before, there were a DUPLICATE ROW here!!! + //"5 60004 write only O", + "6 60005 write only NO", + "7 5 write reorganization O", + "8 30003 write reorganization NO", + "9 6 delete reorganization O")) + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn, false) + i = 1 + postFn = func(tkO *testkit.TestKit) { + require.Equal(t, int(6*2+1), i) + tkO.MustQuery(`select c1,_tidb_rowid,c2,c3 from t`).Sort().Check(testkit.Rows(""+ + "1 1 init O updated", + "10 30004 delete reorganization NO Original", + "11 7 none O Original", + "12 30005 none NO Original", + "2 2 init O updated", + "3 3 delete only O updated", + "4 30001 delete only NO updated", + "5 4 write only O updated", + "6 30002 write only NO updated", + "7 5 write reorganization O Original", + "8 30003 write reorganization NO Original", + "9 6 delete reorganization O Original")) + } + runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn, true) +} + +func runMultiSchemaTest(t *testing.T, createSQL, alterSQL string, initFn, postFn func(*testkit.TestKit), loopFn func(tO, tNO *testkit.TestKit), injectUpdate bool) { + distCtx := testkit.NewDistExecutionContextWithLease(t, 2, 15*time.Second) + store := distCtx.Store + domOwner := distCtx.GetDomain(0) + domNonOwner := distCtx.GetDomain(1) + defer func() { + domOwner.Close() + domNonOwner.Close() + store.Close() + }() + + if !domOwner.DDL().OwnerManager().IsOwner() { + domOwner, domNonOwner = domNonOwner, domOwner + } + + seOwner, err := session.CreateSessionWithDomain(store, domOwner) + require.NoError(t, err) + seNonOwner, err := session.CreateSessionWithDomain(store, domNonOwner) + require.NoError(t, err) + + tkDDLOwner := testkit.NewTestKitWithSession(t, store, seOwner) + tkDDLOwner.MustExec(`use test`) + tkDDLOwner.MustExec(`set @@global.tidb_ddl_reorg_worker_cnt=1`) + tkO := testkit.NewTestKitWithSession(t, store, seOwner) + tkO.MustExec(`use test`) + tkNO := testkit.NewTestKitWithSession(t, store, seNonOwner) + tkNO.MustExec(`use test`) + + tkDDLOwner.MustExec(createSQL) + domOwner.Reload() + domNonOwner.Reload() + initFn(tkO) + verStart := domNonOwner.InfoSchema().SchemaMetaVersion() + hookChan := make(chan struct{}) + hookFunc := func(job *model.Job) { + hookChan <- struct{}{} + logutil.BgLogger().Info("XXXXXXXXXXX Hook now waiting", zap.String("job.State", job.State.String()), zap.String("job.SchemaStage", job.SchemaState.String())) + <-hookChan + logutil.BgLogger().Info("XXXXXXXXXXX Hook released", zap.String("job.State", job.State.String()), zap.String("job.SchemaStage", job.SchemaState.String())) + } + hook := &callback.TestDDLCallback{Do: domOwner} + hook.OnJobRunAfterExported = hookFunc + domOwner.DDL().SetHook(hook) + alterChan := make(chan struct{}) + go func() { + tkDDLOwner.MustExec(alterSQL) + logutil.BgLogger().Info("XXXXXXXXXXX drop partition done!") + alterChan <- struct{}{} + }() + if injectUpdate { + // This can be used for testing concurrent writes during backfill. + // It tested OK, since the backfill will fail and retry where it will get the fresh values and succeed. + require.NoError(t, failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/PartitionBackfillData", func(b bool) { + if b { + tkO.MustExec(`update t set c3 = "updated"`) + } + })) + defer failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/PartitionBackfillData") + } + + // Skip the first state, since we want to compare before vs after in the loop + <-hookChan + hookChan <- struct{}{} + verCurr := verStart + 1 + i := 0 + for { + // Waiting for the next State change to be done (i.e. blocking the state after) + releaseHook := true + for { + select { + case <-hookChan: + case <-alterChan: + releaseHook = false + logutil.BgLogger().Info("XXXXXXXXXXX release hook") + break + } + domOwner.Reload() + if domNonOwner.InfoSchema().SchemaMetaVersion() == domOwner.InfoSchema().SchemaMetaVersion() { + // looping over reorganize data/indexes + hookChan <- struct{}{} + continue + } + break + } + //tk.t.Logf("RefreshSession rand seed: %d", seed) + logutil.BgLogger().Info("XXXXXXXXXXX states loop", zap.Int64("verCurr", verCurr), zap.Int64("NonOwner ver", domNonOwner.InfoSchema().SchemaMetaVersion()), zap.Int64("Owner ver", domOwner.InfoSchema().SchemaMetaVersion())) + domOwner.Reload() + require.Equal(t, verCurr-1, domNonOwner.InfoSchema().SchemaMetaVersion()) + require.Equal(t, verCurr, domOwner.InfoSchema().SchemaMetaVersion()) + loopFn(tkO, tkNO) + domNonOwner.Reload() + verCurr++ + i++ + if !releaseHook { + // Alter done! + break + } + // Continue to next state + hookChan <- struct{}{} + } + logutil.BgLogger().Info("XXXXXXXXXXX states loop done") + postFn(tkO) +} diff --git a/pkg/table/tables/partition.go b/pkg/table/tables/partition.go index a7af6b55b5239..a639a3c85787a 100644 --- a/pkg/table/tables/partition.go +++ b/pkg/table/tables/partition.go @@ -1518,7 +1518,9 @@ func GetReorganizedPartitionedTable(t table.Table) (table.PartitionedTable, erro pi.Type = pi.DDLType pi.Expr = pi.DDLExpr pi.Columns = pi.DDLColumns - tblInfo.ID = pi.NewTableID + if pi.NewTableID != 0 { + tblInfo.ID = pi.NewTableID + } constraints, err := table.LoadCheckConstraint(tblInfo) if err != nil { @@ -1627,6 +1629,10 @@ func partitionedTableAddRecord(ctx sessionctx.Context, t *partitionedTable, r [] return nil, errors.Trace(err) } tbl = t.GetPartition(pid) + if !tbl.Meta().PKIsHandle && !tbl.Meta().IsCommonHandle { + // Preserve the _tidb_rowid also in the new partition! + r = append(r, types.NewIntDatum(recordID.IntValue())) + } recordID, err = tbl.AddRecord(ctx, r, opts...) if err != nil { return diff --git a/pkg/testkit/mockstore.go b/pkg/testkit/mockstore.go index b5cdb8726d7c7..f954c82dff1a4 100644 --- a/pkg/testkit/mockstore.go +++ b/pkg/testkit/mockstore.go @@ -169,6 +169,11 @@ func (d *DistExecutionContext) GetDomain(idx int) *domain.Domain { // NewDistExecutionContext create DistExecutionContext for testing. func NewDistExecutionContext(t testing.TB, serverNum int) *DistExecutionContext { + return NewDistExecutionContextWithLease(t, serverNum, 500*time.Millisecond) +} + +// NewDistExecutionContextWithLease create DistExecutionContext for testing. +func NewDistExecutionContextWithLease(t testing.TB, serverNum int, lease time.Duration) *DistExecutionContext { store, err := mockstore.NewMockStore() require.NoError(t, err) gctuner.GlobalMemoryLimitTuner.Stop() @@ -177,7 +182,7 @@ func NewDistExecutionContext(t testing.TB, serverNum int) *DistExecutionContext var domInfo []string for i := 0; i < serverNum; i++ { - dom := bootstrap4DistExecution(t, store, 500*time.Millisecond) + dom := bootstrap4DistExecution(t, store, lease) if i != serverNum-1 { dom.SetOnClose(func() { /* don't delete the store in domain map */ }) } diff --git a/tests/integrationtest/r/ddl/db_partition.result b/tests/integrationtest/r/ddl/db_partition.result index 1c37b2bcfeb5f..7da5c768f0fcb 100644 --- a/tests/integrationtest/r/ddl/db_partition.result +++ b/tests/integrationtest/r/ddl/db_partition.result @@ -3306,7 +3306,7 @@ id store_id 1 1 select *,_tidb_rowid from t; id store_id _tidb_rowid -0 18 30257 +0 18 30256 1 1 30001 drop table t, t1; create table t (id int not null, store_id int not null ) partition by range (store_id) (partition p0 values less than (6), partition p1 values less than (11), partition p2 values less than (16), partition p3 values less than (21)); @@ -3329,6 +3329,6 @@ id store_id 1 1 select *,_tidb_rowid from t; id store_id _tidb_rowid -0 18 30257 +0 18 30256 1 1 30001 drop table t, t1;