Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114) #58124

Merged
212 changes: 115 additions & 97 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/opcode"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -3137,12 +3136,18 @@ func doPartitionReorgWork(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, tb
type reorgPartitionWorker struct {
*backfillCtx
// Static allocated to limit memory allocations
rowRecords []*rowRecord
rowRecords []*RowVal
rowDecoder *decoder.RowDecoder
rowMap map[int64]types.Datum
writeColOffsetMap map[int64]int
maxOffset int
reorgedTbl table.PartitionedTable
// Only used for non-clustered tables, since we need to re-generate _tidb_rowid,
// and check if the old _tidb_rowid was already written or not.
// If the old _tidb_rowid already exists, then the row is already backfilled (double written)
// and can be skipped. Otherwise, we will insert it with a new _tidb_rowid.
// The original _tidb_rowids, used to check if already backfilled (double written).
oldKeys []kv.Key
}

func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) (*reorgPartitionWorker, error) {
Expand All @@ -3158,15 +3163,13 @@ func newReorgPartitionWorker(sessCtx sessionctx.Context, i int, t table.Physical
writeColOffsetMap := make(map[int64]int, len(partColIDs))
maxOffset := 0
for _, id := range partColIDs {
var offset int
for _, col := range pt.Cols() {
if col.ID == id {
offset = col.Offset
writeColOffsetMap[id] = col.Offset
maxOffset = mathutil.Max[int](maxOffset, col.Offset)
break
}
}
writeColOffsetMap[id] = offset
maxOffset = mathutil.Max[int](maxOffset, offset)
}
return &reorgPartitionWorker{
backfillCtx: newBackfillCtx(reorgInfo.d, i, sessCtx, reorgInfo.SchemaName, t, jc, "reorg_partition_rate", false),
Expand All @@ -3190,54 +3193,116 @@ func (w *reorgPartitionWorker) BackfillData(handleRange reorgBackfillTask) (task
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)

rowRecords, nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
nextKey, taskDone, err := w.fetchRowColVals(txn, handleRange)
if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone

warningsMap := make(map[errors.ErrorID]*terror.Error)
warningsCountMap := make(map[errors.ErrorID]int64)
for _, prr := range rowRecords {
taskCtx.scanCount++

err = txn.Set(prr.key, prr.vals)
var found map[string][]byte
// If non-clustered table, then we need to replace the _tidb_rowid handles since
// there may be duplicates across different partitions, due to EXCHANGE PARTITION.
// Meaning we need to check here if a record was double written to the new partition,
// i.e. concurrently written by StateWriteOnly or StateWriteReorganization.
// If so, then we must skip it.
if len(w.oldKeys) > 0 {
// We need to use pessimistic, since we have no other unique key than the _tidb_rowid,
// which we are changing, so we need to check if we can write the old one without collision.
//txn.SetOption(kv.Pessimistic, true)

// we must check if old IDs already been written,
// i.e. double written by StateWriteOnly or StateWriteReorganization.
// The good thing is that we can then also skip the index generation for that row and we don't need to
// check if duplicate index entries was already copied either!
// If we skip checking, then we will overwrite those double written rows.
// TODO: Would it be OK to overwrite them?
// They will always be double writing during this state, but can change between fetchRowColVals
// and writing here, which should be caught by the SetAssertNotExists below.
found, err = txn.BatchGet(ctx, w.oldKeys)
if err != nil {
return errors.Trace(err)
}
taskCtx.addedCount++
if prr.warning != nil {
if _, ok := warningsCountMap[prr.warning.ID()]; ok {
warningsCountMap[prr.warning.ID()]++
} else {
warningsCountMap[prr.warning.ID()] = 1
warningsMap[prr.warning.ID()] = prr.warning
}
}
Comment on lines -3210 to -3217
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warnings were never set, and cannot happen, since we are copying data without any conversions.

// TODO: Future optimization: also write the indexes here?
// What if the transaction limit is just enough for a single row, without index?
// Hmm, how could that be in the first place?
// For now, implement the batch-txn w.addTableIndex,
// since it already exists and is in use
logutil.BgLogger().Info("BackfillData BatchGet", zap.Int("Found keys", len(found)))
// TODO: while waiting for BatchGet to check for duplicate, do another round of reads in parallel?
}

// Collect the warnings.
taskCtx.warnings, taskCtx.warningsCount = warningsMap, warningsCountMap
failpoint.Call("github.com/pingcap/tidb/pkg/ddl/PartitionBackfillData", w.rowRecords)
for i, prr := range w.rowRecords {
taskCtx.scanCount++
key := prr.key
logutil.BgLogger().Info("BackfillData row", zap.String("key", key.String()))
if len(w.oldKeys) > 0 {
if _, ok := found[string(w.oldKeys[i])]; ok {
// Already filled, i.e. double written by concurrent DML.
logutil.BgLogger().Info("BackfillData already filled key", zap.String("key", w.oldKeys[i].String()))
continue
}

// also add the index entries here? And make sure they are not added somewhere else
// Check if we can write the old key,
// since there can still be a concurrent update/insert happening that would
// cause a duplicate.
err = txn.Set(w.oldKeys[i], prr.vals)
if err != nil {
logutil.BgLogger().Info("BackfillData failed to Set key", zap.String("key", w.oldKeys[i].String()), zap.Error(err))
return errors.Trace(err)
}
err = txn.SetAssertion(w.oldKeys[i], kv.SetAssertNotExist)
if err != nil {
logutil.BgLogger().Info("BackfillData failed to SetOption", zap.String("key", w.oldKeys[i].String()), zap.Error(err))
return errors.Trace(err)
}
err = txn.Delete(w.oldKeys[i])
if err != nil {
logutil.BgLogger().Info("BackfillData failed to Delete", zap.String("key", w.oldKeys[i].String()), zap.Error(err))
return errors.Trace(err)
}
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
// Generate new _tidb_rowid.
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
// Keep using the original table's allocator
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids)
if err != nil {
return errors.Trace(err)
}
}
recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl)
if err != nil {
return errors.Trace(err)
}

// tablecodec.prefixLen is not exported, but is just TableSplitKeyLen + 2
key = tablecodec.EncodeRecordKey(key[:tablecodec.TableSplitKeyLen+2], recordID)
}
logutil.BgLogger().Info("BackfillData set", zap.String("key", key.String()))
err = txn.Set(key, prr.vals)
if err != nil {
return errors.Trace(err)
}
taskCtx.addedCount++
}
return nil
})
logutil.BgLogger().Info("BackfillData err", zap.Error(errInTxn))
logSlowOperations(time.Since(oprStartTime), "BackfillData", 3000)

return
}

func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*rowRecord, kv.Key, bool, error) {
// RowVal is exported here to be able to have a failpoint callback for triggering testing concurrent DMLs.
type RowVal struct {
key kv.Key
vals []byte
}

func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reorgBackfillTask) (kv.Key, bool, error) {
w.rowRecords = w.rowRecords[:0]
w.oldKeys = w.oldKeys[:0]
startTime := time.Now()

isClustered := w.reorgedTbl.Meta().IsCommonHandle || w.reorgedTbl.Meta().PKIsHandle
// taskDone means that the added handle is out of taskRange.endHandle.
taskDone := false
sysTZ := w.sessCtx.GetSessionVars().StmtCtx.TimeZone()
Expand All @@ -3257,8 +3322,6 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
return false, nil
}

// TODO: Extend for normal tables
// TODO: Extend for REMOVE PARTITIONING
_, err := w.rowDecoder.DecodeTheExistedColumnMap(w.sessCtx, handle, rawRow, sysTZ, w.rowMap)
if err != nil {
return false, errors.Trace(err)
Expand All @@ -3276,34 +3339,14 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
if err != nil {
return false, errors.Trace(err)
}
var newKey kv.Key
if w.reorgedTbl.Meta().PKIsHandle || w.reorgedTbl.Meta().IsCommonHandle {
pid := p.GetPhysicalID()
newKey = tablecodec.EncodeTablePrefix(pid)
newKey = append(newKey, recordKey[len(newKey):]...)
} else {
// Non-clustered table / not unique _tidb_rowid for the whole table
// Generate new _tidb_rowid if exists.
// Due to EXCHANGE PARTITION, the existing _tidb_rowid may collide between partitions!
stmtCtx := w.sessCtx.GetSessionVars().StmtCtx
if stmtCtx.BaseRowID >= stmtCtx.MaxRowID {
// TODO: Which autoid allocator to use?
ids := uint64(max(1, w.batchCnt-len(w.rowRecords)))
// Keep using the original table's allocator
stmtCtx.BaseRowID, stmtCtx.MaxRowID, err = tables.AllocHandleIDs(w.ctx, w.sessCtx, w.reorgedTbl, ids)
if err != nil {
return false, errors.Trace(err)
}
}
recordID, err := tables.AllocHandle(w.ctx, w.sessCtx, w.reorgedTbl)
if err != nil {
return false, errors.Trace(err)
}
newKey = tablecodec.EncodeRecordKey(p.RecordPrefix(), recordID)
newKey := tablecodec.EncodeTablePrefix(p.GetPhysicalID())
newKey = append(newKey, recordKey[tablecodec.TableSplitKeyLen:]...)
w.rowRecords = append(w.rowRecords, &RowVal{key: newKey, vals: rawRow})
if !isClustered {
oldKey := newKey[:tablecodec.TableSplitKeyLen]
oldKey = append(oldKey, recordKey[tablecodec.TableSplitKeyLen:]...)
w.oldKeys = append(w.oldKeys, oldKey)
}
w.rowRecords = append(w.rowRecords, &rowRecord{
key: newKey, vals: rawRow,
})

w.cleanRowMap()
lastAccessedHandle = recordKey
Expand All @@ -3318,8 +3361,8 @@ func (w *reorgPartitionWorker) fetchRowColVals(txn kv.Transaction, taskRange reo
taskDone = true
}

logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.rowRecords, getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
logutil.BgLogger().Debug("txn fetches handle info", zap.String("category", "ddl"), zap.Uint64("txnStartTS", txn.StartTS()), zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)), zap.Error(err))
return getNextHandleKey(taskRange, taskDone, lastAccessedHandle), taskDone, errors.Trace(err)
}

func (w *reorgPartitionWorker) cleanRowMap() {
Expand All @@ -3341,17 +3384,20 @@ func (w *reorgPartitionWorker) GetCtx() *backfillCtx {
}

func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) error {
// First copy all table data to the new partitions
// First copy all table data to the new AddingDefinitions partitions
// from each of the DroppingDefinitions partitions.
// Then create all indexes on the AddingDefinitions partitions
// for each new index, one partition at a time.
// Then create all indexes on the AddingDefinitions partitions,

// Copy the data from the DroppingDefinitions to the AddingDefinitions
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
err := w.updatePhysicalTableRow(t, reorgInfo)
if err != nil {
return errors.Trace(err)
}
if len(reorgInfo.elements) <= 1 {
// No indexes to (re)create, all done!
return nil
}
}

failpoint.Inject("reorgPartitionAfterDataCopy", func(val failpoint.Value) {
Expand All @@ -3361,32 +3407,11 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
}
})

// Rewrite this to do all indexes at once in addTableIndex
// instead of calling it once per index (meaning reading the table multiple times)
// But for now, try to understand how it works...
firstNewPartitionID := t.Meta().Partition.AddingDefinitions[0].ID
startElementOffset := 0
//startElementOffsetToResetHandle := -1
// This backfill job starts with backfilling index data, whose index ID is currElement.ID.
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) {
// First run, have not yet started backfilling index data
// Restart with the first new partition.
// TODO: handle remove partitioning
reorgInfo.PhysicalTableID = firstNewPartitionID
} else {
// The job was interrupted and has been restarted,
// reset and start from where it was done
for i, element := range reorgInfo.elements[1:] {
if reorgInfo.currElement.ID == element.ID {
startElementOffset = i
//startElementOffsetToResetHandle = i
break
}
}
}

for i := startElementOffset; i < len(reorgInfo.elements[1:]); i++ {
// Now build the indexes in the new partitions
// row data has been copied, now proceed with creating the indexes
// on the new AddingDefinitions partitions
reorgInfo.PhysicalTableID = t.Meta().Partition.AddingDefinitions[0].ID
reorgInfo.currElement = reorgInfo.elements[1]
var physTbl table.PhysicalTable
if tbl, ok := t.(table.PartitionedTable); ok {
physTbl = tbl.GetPartition(reorgInfo.PhysicalTableID)
Expand All @@ -3399,10 +3424,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
if err != nil {
return errors.Trace(err)
}
// TODO: Can we improve this in case of a crash?
// like where the regInfo PhysicalTableID and element is the same,
// and the tableid in the key-prefix regInfo.StartKey and regInfo.EndKey matches with PhysicalTableID
// do not change the reorgInfo start/end key
startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.d, physTbl, currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
Expand All @@ -3411,8 +3432,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
// Always (re)start with the full PhysicalTable range
reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle

// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
// Write the reorg info to store so the whole reorganize process can recover from panic.
err = reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool)
logutil.BgLogger().Info("update column and indexes", zap.String("category", "ddl"),
Expand All @@ -3429,7 +3448,6 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo)
if err != nil {
return errors.Trace(err)
}
reorgInfo.PhysicalTableID = firstNewPartitionID
}
failpoint.Inject("reorgPartitionAfterIndex", func(val failpoint.Value) {
//nolint:forcetypeassert
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ go_test(
srcs = [
"db_partition_test.go",
"main_test.go",
"multi_domain_test.go",
],
flaky = True,
shard_count = 49,
shard_count = 50,
deps = [
"//pkg/config",
"//pkg/ddl",
Expand Down
Loading