Skip to content

Commit

Permalink
*: Reorg partition fix delete ranges and handling non-clustered table…
Browse files Browse the repository at this point in the history
…s with concurrent DML (#57114) (#58124)

ref #45133, close #56822, close #57510
  • Loading branch information
ti-chi-bot authored Dec 13, 2024
1 parent f2c805b commit 3157fc3
Show file tree
Hide file tree
Showing 7 changed files with 362 additions and 117 deletions.
180 changes: 88 additions & 92 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/opcode"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -3143,6 +3142,12 @@ type reorgPartitionWorker struct {
writeColOffsetMap map[int64]int
maxOffset int
reorgedTbl table.PartitionedTable
// Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
// and check if the old _tidb_rowid was already written or not.
// If the old _tidb_rowid already exists, then the row is already backfilled (double written)
// and can be skipped. Otherwise, we will insert it with a new _tidb_rowid.
// The original _tidb_rowids, used to check if already backfilled (double written).
oldKeys []kv.Key
}

func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) (*reorgPartitionWorker, error) {
Expand Down Expand Up @@ -3190,54 +3195,92 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

warningsMap := make(map[errors.ErrorID]*terror.Error)
warningsCountMap := make(map[errors.ErrorID]int64)
for _, prr := range rowRecords {
taskCtx.scanCount++

err = txn.Set(prr.key, prr.vals)
var found map[string][]byte
// If non-clustered table, then we need to replace the _tidb_rowid handles since
// there may be duplicates across different partitions, due to EXCHANGE PARTITION.
// Meaning we need to check here if a record was double written to the new partition,
// i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
// If so, then we must skip it.
if len(w.oldKeys) > 0 {
// If we skip checking, then we will duplicate that double written row, with a new _tidb_rowid.
found, err = txn.BatchGet(ctx, w.oldKeys)
if err != nil {
return errors.Trace(err)
}
taskCtx.addedCount++
if prr.warning != nil {
if _, ok := warningsCountMap[prr.warning.ID()]; ok {
warningsCountMap[prr.warning.ID()]++
} else {
warningsCountMap[prr.warning.ID()] = 1
warningsMap[prr.warning.ID()] = prr.warning
}
}
// TODO: Future optimization: also write the indexes here?
// What if the transaction limit is just enough for a single row, without index?
// Hmm, how could that be in the first place?
// For now, implement the batch-txn w.addTableIndex,
// since it already exists and is in use
// TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
}

// Collect the warnings.
taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap
failpoint.Call("github.com/pingcap/tidb/pkg/ddl/PartitionBackfillData", len(w.rowRecords) > 0)
for i, prr := range w.rowRecords {
taskCtx.scanCount++
key := prr.key
if len(w.oldKeys) > 0 {
if _, ok := found[string(w.oldKeys[i])]; ok {
// Already filled, i.e. double written by concurrent DML.
continue
}

// also add the index entries here? And make sure they are not added somewhere else
// Pretend/Check if we can write the old key,
// since there can still be a concurrent update/insert happening that would
// cause a duplicate.
err = txn.Set(w.oldKeys[i], prr.vals)
if err != nil {
return errors.Trace(err)
}
err = txn.SetAssertion(w.oldKeys[i], kv.SetAssertNotExist)
if err != nil {
return errors.Trace(err)
}
// Don't actually write it, just make sure this transaction would
// fail if another transaction writes the same key before us.
err = txn.Delete(w.oldKeys[i])
if err != nil {
return errors.Trace(err)
}
// Generate new _tidb_rowid.
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
// Keep using the original table's allocator
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids)
if err != nil {
return errors.Trace(err)
}
}
recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl)
if err != nil {
return errors.Trace(err)
}

// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], recordID)
}
err = txn.Set(key, prr.vals)
if err != nil {
return errors.Trace(err)
}
taskCtx.addedCount++
}
return nil
})
logSlowOperations(time.Since(oprStartTime), "BackfillData", 3000)

return
}

func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) {
func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) {
w.rowRecords = w.rowRecords[:0]
w.oldKeys = w.oldKeys[:0]
startTime := time.Now()

isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
sysTZ := w.sessCtx.GetSessionVars().StmtCtx.TimeZone()
Expand All @@ -3257,8 +3300,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
return false, nil
}

// TODO: Extend for normal tables
// TODO: Extend for REMOVE PARTITIONING
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, sysTZ, w.rowMap)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -3276,34 +3317,14 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
if err != nil {
return false, errors.Trace(err)
}
var newKey kv.Key
if w.reorgedTbl.Meta().PKIsHandle || w.reorgedTbl.Meta().IsCommonHandle {
pid := p.GetPhysicalID()
newKey = tablecodec.EncodeTablePrefix(pid)
newKey = append(newKey, recordKey[len(newKey):]...)
} else {
// Non-clustered table / not unique _tidb_rowid for the whole table
// Generate new _tidb_rowid if exists.
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
// TODO: Which autoid allocator to use?
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
// Keep using the original table's allocator
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids)
if err != nil {
return false, errors.Trace(err)
}
}
recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl)
if err != nil {
return false, errors.Trace(err)
}
newKey = tablecodec.EncodeRecordKey(p.RecordPrefix(), recordID)
newKey := tablecodec.EncodeTablePrefix(p.GetPhysicalID())
newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...)
w.rowRecords = append(w.rowRecords, &rowRecord{key: newKey, vals: rawRow})
if !isClustered {
oldKey := newKey[:tablecodec.TableSplitKeyLen]
oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...)
w.oldKeys = append(w.oldKeys, oldKey)
}
w.rowRecords = append(w.rowRecords, &rowRecord{
key: newKey, vals: rawRow,
})

w.cleanRowMap()
lastAccessedHandle = recordKey
Expand All @@ -3318,8 +3339,8 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
taskDone = true
}

logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)), zap.Error(err))
return getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
}

func (w *reorgPartitionWorker) cleanRowMap() {
Expand All @@ -3341,17 +3362,20 @@ func (w *reorgPartitionWorker) GetCtx() *backfillCtx {
}

func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) error {
// First copy all table data to the new partitions
// First copy all table data to the new AddingDefinitions partitions
// from each of the DroppingDefinitions partitions.
// Then create all indexes on the AddingDefinitions partitions
// for each new index, one partition at a time.
// Then create all indexes on the AddingDefinitions partitions,

// Copy the data from the DroppingDefinitions to the AddingDefinitions
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
err := w.updatePhysicalTableRow(t, reorgInfo)
if err != nil {
return errors.Trace(err)
}
if len(reorgInfo.elements) <= 1 {
// No indexes to (re)create, all done!
return nil
}
}

failpoint.Inject("reorgPartitionAfterDataCopy", func(val failpoint.Value) {
Expand All @@ -3361,32 +3385,11 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
}
})

// Rewrite this to do all indexes at once in addTableIndex
// instead of calling it once per index (meaning reading the table multiple times)
// But for now, try to understand how it works...
firstNewPartitionID := t.Meta().Partition.AddingDefinitions[0].ID
startElementOffset := 0
//startElementOffsetToResetHandle := -1
// This backfill job starts with backfilling index data, whose index ID is currElement.ID.
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) {
// First run, have not yet started backfilling index data
// Restart with the first new partition.
// TODO: handle remove partitioning
reorgInfo.PhysicalTableID = firstNewPartitionID
} else {
// The job was interrupted and has been restarted,
// reset and start from where it was done
for i, element := range reorgInfo.elements[1:] {
if reorgInfo.currElement.ID == element.ID {
startElementOffset = i
//startElementOffsetToResetHandle = i
break
}
}
}

for i := startElementOffset; i < len(reorgInfo.elements[1:]); i++ {
// Now build the indexes in the new partitions
// row data has been copied, now proceed with creating the indexes
// on the new AddingDefinitions partitions
reorgInfo.PhysicalTableID = t.Meta().Partition.AddingDefinitions[0].ID
reorgInfo.currElement = reorgInfo.elements[1]
var physTbl table.PhysicalTable
if tbl, ok := t.(table.PartitionedTable); ok {
physTbl = tbl.GetPartition(reorgInfo.PhysicalTableID)
Expand All @@ -3399,10 +3402,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
if err != nil {
return errors.Trace(err)
}
// TODO: Can we improve this in case of a crash?
// like where the regInfo PhysicalTableID and element is the same,
// and the tableid in the key-prefix regInfo.StartKey and regInfo.EndKey matches with PhysicalTableID
// do not change the reorgInfo start/end key
startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
Expand All @@ -3411,8 +3410,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
// Always (re)start with the full PhysicalTable range
reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle

// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
// Write the reorg info to store so the whole reorganize process can recover from panic.
err = reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool)
logutil.BgLogger().Info("update column and indexes", zap.String("category", "ddl"),
Expand All @@ -3429,7 +3426,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
if err != nil {
return errors.Trace(err)
}
reorgInfo.PhysicalTableID = firstNewPartitionID
}
failpoint.Inject("reorgPartitionAfterIndex", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ go_test(
srcs = [
"db_partition_test.go",
"main_test.go",
"multi_domain_test.go",
],
flaky = True,
shard_count = 49,
shard_count = 50,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
32 changes: 12 additions & 20 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3608,10 +3608,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`COMMIT`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9"))
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9"))
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "23 22 6", "27 26 8", "32 31 10"))
"19 18 4", "23 22 6", "27 26 8", "31 30 10"))

waitFor(4, "t", "write reorganization")
tk3.MustExec(`BEGIN`)
Expand All @@ -3621,28 +3621,20 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`insert into t values (null, 23)`)
tk2.MustExec(`COMMIT`)

/*
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)

tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
*/
tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
tk3.MustExec(`COMMIT`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))

//waitFor(4, "t", "public")
//tk2.MustExec(`commit`)
// TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'")
"27 26 8", "30012 12 12", "30013 18 4", "30014 24 7", "30264 16 18", "30265 22 6", "30266 28 9", "30516 11 11", "30517 2 2", "30518 20 5", "31 30 10", "33 32 21", "35 34 22", "37 36 23", "41 40 25"))
waitFor(4, "t", "none")
tk2.MustExec(`commit`)
require.NoError(t, <-alterChan)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))
"27 26 8", "30012 12 12", "30013 18 4", "30014 24 7", "30264 16 18", "30265 22 6", "30266 28 9", "30516 11 11", "30517 2 2", "30518 20 5", "31 30 10", "33 32 21", "35 34 22", "37 36 23", "39 38 24", "41 40 25", "43 42 26"))
}

func TestAlterLastIntervalPartition(t *testing.T) {
Expand Down
Loading

0 comments on commit 3157fc3

Please sign in to comment.