diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index e9e14da010ae5..3eec8b0ca38de 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -256,7 +256,6 @@ go_test( "placement_policy_test.go", "placement_sql_test.go", "primary_key_handle_test.go", - "reorg_partition_test.go", "repair_table_test.go", "restart_test.go", "rollingback_test.go", @@ -343,7 +342,6 @@ go_test( "//pkg/util/domainutil", "//pkg/util/gcutil", "//pkg/util/generic", - "//pkg/util/mathutil", "//pkg/util/mock", "//pkg/util/sem", "//pkg/util/sqlexec", diff --git a/pkg/ddl/index.go b/pkg/ddl/index.go index e262bd507fd2d..7d898b0477b7b 100644 --- a/pkg/ddl/index.go +++ b/pkg/ddl/index.go @@ -1560,6 +1560,14 @@ func (w *addIndexTxnWorker) checkHandleExists(idxInfo *model.IndexInfo, key kv.K if hasBeenBackFilled { return nil } + if idxInfo.Global { + // 'handle' comes from reading directly from a partition, without partition id, + // so we can only compare the handle part of the key. + if ph, ok := h.(kv.PartitionHandle); ok && ph.Handle.Equal(handle) { + // table row has been back-filled already, OK to add the index entry + return nil + } + } return ddlutil.GenKeyExistsErr(key, value, idxInfo, tblInfo) } diff --git a/pkg/ddl/partition.go b/pkg/ddl/partition.go index badee48f99c9d..d2011a22eb1f5 100644 --- a/pkg/ddl/partition.go +++ b/pkg/ddl/partition.go @@ -2177,22 +2177,33 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, t *meta.Meta, job *mod job.State = model.JobStateCancelled return ver, err } - // ALTER TABLE ... PARTITION BY - if partInfo.Type != pmodel.PartitionTypeNone { - // Also remove anything with the new table id + if job.Type == model.ActionAlterTablePartitioning { + // ALTER TABLE t PARTITION BY ... creates an additional + // Table ID + // Note, for REMOVE PARTITIONING, it is the same + // as for the single partition, to be changed to table. physicalTableIDs = append(physicalTableIDs, partInfo.NewTableID) - // Reset if it was normal table before - if tblInfo.Partition.Type == pmodel.PartitionTypeNone || - tblInfo.Partition.DDLType == pmodel.PartitionTypeNone { - tblInfo.Partition = nil - } else { - tblInfo.Partition.ClearReorgIntermediateInfo() + } + + var dropIndices []*model.IndexInfo + for _, indexInfo := range tblInfo.Indices { + if indexInfo.Unique && + indexInfo.State == model.StateDeleteReorganization && + tblInfo.Partition.DDLState == model.StateDeleteReorganization { + dropIndices = append(dropIndices, indexInfo) } + } + for _, indexInfo := range dropIndices { + DropIndexColumnFlag(tblInfo, indexInfo) + RemoveDependentHiddenColumns(tblInfo, indexInfo) + removeIndexInfo(tblInfo, indexInfo) + } + + if tblInfo.Partition.Type == pmodel.PartitionTypeNone { + tblInfo.Partition = nil } else { - // REMOVE PARTITIONING tblInfo.Partition.ClearReorgIntermediateInfo() } - ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -3138,6 +3149,8 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo // All global indexes must be recreated, we cannot update them in-place, since we must have // both old and new set of partition ids in the unique index at the same time! + // We also need to recreate and change between non-global unique indexes and global index, + // in case a new PARTITION BY changes if all partition columns are included or not. for _, index := range tblInfo.Indices { newGlobal := getNewGlobal(partInfo, index) if job.Type == model.ActionRemovePartitioning { @@ -3191,6 +3204,12 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo } } } + failpoint.Inject("reorgPartCancel1", func(val failpoint.Value) { + if val.(bool) { + job.State = model.JobStateCancelled + failpoint.Return(ver, errors.New("Injected error by reorgPartCancel1")) + } + }) // From now on we cannot just cancel the DDL, we must roll back if changesMade! changesMade := false if tblInfo.TiFlashReplica != nil { @@ -3210,7 +3229,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo job.State = model.JobStateCancelled return ver, errors.Trace(err) } - return convertAddTablePartitionJob2RollbackJob(jobCtx, t, job, err, tblInfo) + return rollbackReorganizePartitionWithErr(jobCtx, t, job, err) } if len(bundles) > 0 { @@ -3219,7 +3238,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo job.State = model.JobStateCancelled return ver, errors.Wrapf(err, "failed to notify PD the placement rules") } - return convertAddTablePartitionJob2RollbackJob(jobCtx, t, job, err, tblInfo) + return rollbackReorganizePartitionWithErr(jobCtx, t, job, err) } changesMade = true } @@ -3235,7 +3254,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo job.State = model.JobStateCancelled return ver, err } - return convertAddTablePartitionJob2RollbackJob(jobCtx, t, job, err, tblInfo) + return rollbackReorganizePartitionWithErr(jobCtx, t, job, err) } // Doing the preSplitAndScatter here, since all checks are completed, @@ -3246,6 +3265,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo splitPartitionTableRegion(w.sess.Context, s, tblInfo, partInfo.Definitions, variable.ScatterTable) } + if job.Type == model.ActionReorganizePartition { + tblInfo.Partition.SetOriginalPartitionIDs() + } + // Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that. metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64)) job.SchemaState = model.StateDeleteOnly @@ -3254,6 +3277,12 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo if err != nil { return ver, errors.Trace(err) } + failpoint.Inject("reorgPartRollback1", func(val failpoint.Value) { + if val.(bool) { + err = errors.New("Injected error by reorgPartRollback1") + failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, t, job, err)) + } + }) // Is really both StateDeleteOnly AND StateWriteOnly needed? // If transaction A in WriteOnly inserts row 1 (into both new and old partition set) @@ -3282,9 +3311,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo count := tblInfo.TiFlashReplica.Count needRetry, err := checkPartitionReplica(count, addingDefinitions, jobCtx) if err != nil { - // need to rollback, since we tried to register the new - // partitions before! - return convertAddTablePartitionJob2RollbackJob(jobCtx, t, job, err, tblInfo) + return rollbackReorganizePartitionWithErr(jobCtx, t, job, err) } if needRetry { // The new added partition hasn't been replicated. @@ -3308,6 +3335,12 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo } tblInfo.Partition.DDLState = model.StateWriteOnly metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.2 / float64(math.MaxUint64)) + failpoint.Inject("reorgPartRollback2", func(val failpoint.Value) { + if val.(bool) { + err = errors.New("Injected error by reorgPartRollback2") + failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, t, job, err)) + } + }) ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) job.SchemaState = model.StateWriteOnly case model.StateWriteOnly: @@ -3329,6 +3362,19 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo if err2 != nil { return ver, errors.Trace(err2) } + failpoint.Inject("reorgPartFail1", func(val failpoint.Value) { + // Failures will retry, then do rollback + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + failpoint.Return(ver, errors.New("Injected error by reorgPartFail1")) + } + }) + failpoint.Inject("reorgPartRollback3", func(val failpoint.Value) { + if val.(bool) { + err = errors.New("Injected error by reorgPartRollback3") + failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, t, job, err)) + } + }) var done bool done, ver, err = doPartitionReorgWork(w, jobCtx, t, job, tbl, physicalTableIDs) @@ -3336,6 +3382,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo return ver, err } + failpoint.Inject("reorgPartRollback4", func(val failpoint.Value) { + if val.(bool) { + err = errors.New("Injected error by reorgPartRollback4") + failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, t, job, err)) + } + }) + for _, index := range tblInfo.Indices { if !index.Unique { continue @@ -3351,8 +3404,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo } else { inAllPartitionColumns, err := checkPartitionKeysConstraint(partInfo, index.Columns, tblInfo) if err != nil { - job.State = model.JobStateCancelled - return ver, errors.Trace(err) + return rollbackReorganizePartitionWithErr(jobCtx, t, job, err) } if !inAllPartitionColumns { // Mark the old unique index as non-readable, and to be dropped, @@ -3363,11 +3415,6 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo } } firstPartIdx, lastPartIdx, idMap, err2 := getReplacedPartitionIDs(partNames, tblInfo.Partition) - failpoint.Inject("reorgPartWriteReorgReplacedPartIDsFail", func(val failpoint.Value) { - if val.(bool) { - err2 = errors.New("Injected error by reorgPartWriteReorgReplacedPartIDsFail") - } - }) if err2 != nil { return ver, err2 } @@ -3383,6 +3430,13 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo tblInfo.Partition.Columns, tblInfo.Partition.DDLColumns = tblInfo.Partition.DDLColumns, tblInfo.Partition.Columns } + failpoint.Inject("reorgPartFail2", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + failpoint.Return(ver, errors.New("Injected error by reorgPartFail2")) + } + }) + // Now all the data copying is done, but we cannot simply remove the droppingDefinitions // since they are a part of the normal Definitions that other nodes with // the current schema version. So we need to double write for one more schema version @@ -3410,6 +3464,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo tblInfo.Partition.DroppingDefinitions = nil tblInfo.Partition.AddingDefinitions = nil tblInfo.Partition.DDLState = model.StateNone + tblInfo.Partition.OriginalPartitionIDsOrder = nil var dropIndices []*model.IndexInfo for _, indexInfo := range tblInfo.Indices { @@ -3424,6 +3479,12 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo for _, indexInfo := range dropIndices { removeIndexInfo(tblInfo, indexInfo) } + failpoint.Inject("reorgPartFail3", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + failpoint.Return(ver, errors.New("Injected error by reorgPartFail3")) + } + }) var oldTblID int64 if job.Type != model.ActionReorganizePartition { // ALTER TABLE ... PARTITION BY @@ -3437,12 +3498,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo // Check if they are carried over in ApplyDiff?!? autoIDs, err := t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Get() if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } err = t.DropTableOrView(job.SchemaID, tblInfo.ID) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } tblInfo.ID = partInfo.NewTableID @@ -3457,25 +3516,29 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *mo // ALTER TABLE ... PARTITION BY tblInfo.Partition.ClearReorgIntermediateInfo() } + failpoint.Inject("reorgPartFail4", func(val failpoint.Value) { + if val.(bool) { + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + failpoint.Return(ver, errors.New("Injected error by reorgPartFail4")) + } + }) err = t.GetAutoIDAccessors(job.SchemaID, tblInfo.ID).Put(autoIDs) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } - // TODO: Add failpoint here? err = t.CreateTableOrView(job.SchemaID, tblInfo) if err != nil { - job.State = model.JobStateCancelled return ver, errors.Trace(err) } } - job.CtxVars = []any{physicalTableIDs, newIDs} - ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) - failpoint.Inject("reorgPartWriteReorgSchemaVersionUpdateFail", func(val failpoint.Value) { + failpoint.Inject("reorgPartFail5", func(val failpoint.Value) { if val.(bool) { - err = errors.New("Injected error by reorgPartWriteReorgSchemaVersionUpdateFail") + job.ErrorCount += variable.GetDDLErrorCountLimit() / 2 + failpoint.Return(ver, errors.New("Injected error by reorgPartFail5")) } }) + job.CtxVars = []any{physicalTableIDs, newIDs} + ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) } @@ -3590,8 +3653,7 @@ func doPartitionReorgWork(w *worker, jobCtx *jobContext, t *meta.Meta, job *mode zap.Stringer("job", job), zap.Error(err1)) } logutil.DDLLogger().Warn("reorg partition job failed, convert job to rollback", zap.Stringer("job", job), zap.Error(err)) - // TODO: rollback new global indexes! TODO: How to handle new index ids? - ver, err = convertAddTablePartitionJob2RollbackJob(jobCtx, t, job, err, tbl.Meta()) + ver, err = rollbackReorganizePartitionWithErr(jobCtx, t, job, err) return false, ver, errors.Trace(err) } return true, ver, err @@ -3895,42 +3957,42 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) } } if len(reorgInfo.elements) == 0 { - // No global indexes - return nil + reorgInfo.PhysicalTableID = 0 + } + if reorgInfo.PhysicalTableID != 0 { + reorgInfo.currElement = reorgInfo.elements[0] + pid := pi.Definitions[0].ID + if _, err = findNextPartitionID(pid, pi.DroppingDefinitions); err == nil { + // Skip all dropped partitions + pid, err = findNextNonTouchedPartitionID(pid, pi) + if err != nil { + return errors.Trace(err) + } + } + // if pid == 0 => All partitions will be dropped, nothing more to add to global indexes. + reorgInfo.PhysicalTableID = pid } - reorgInfo.currElement = reorgInfo.elements[0] - pid := pi.Definitions[0].ID - if _, err = findNextPartitionID(pid, pi.DroppingDefinitions); err == nil { - // Skip all dropped partitions - pid, err = findNextNonTouchedPartitionID(pid, pi) + if reorgInfo.PhysicalTableID != 0 { + var physTbl table.PhysicalTable + if tbl, ok := t.(table.PartitionedTable); ok { + physTbl = tbl.GetPartition(reorgInfo.PhysicalTableID) + } else if tbl, ok := t.(table.PhysicalTable); ok { + // This may be used when partitioning a non-partitioned table + physTbl = tbl + } + // Get the original start handle and end handle. + currentVer, err := getValidCurrentVersion(reorgInfo.jobCtx.store) + if err != nil { + return errors.Trace(err) + } + startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.jobCtx.store, physTbl, currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) } - } - if pid == 0 { - // All partitions will be dropped, nothing more to add to global indexes. - return nil - } - reorgInfo.PhysicalTableID = pid - var physTbl table.PhysicalTable - if tbl, ok := t.(table.PartitionedTable); ok { - physTbl = tbl.GetPartition(reorgInfo.PhysicalTableID) - } else if tbl, ok := t.(table.PhysicalTable); ok { - // This may be used when partitioning a non-partitioned table - physTbl = tbl - } - // Get the original start handle and end handle. - currentVer, err := getValidCurrentVersion(reorgInfo.jobCtx.store) - if err != nil { - return errors.Trace(err) - } - startHandle, endHandle, err := getTableRange(reorgInfo.NewJobContext(), reorgInfo.jobCtx.store, physTbl, currentVer.Ver, reorgInfo.Job.Priority) - if err != nil { - return errors.Trace(err) - } - // Always (re)start with the full PhysicalTable range - reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle + // Always (re)start with the full PhysicalTable range + reorgInfo.StartKey, reorgInfo.EndKey = startHandle, endHandle + } // Write the reorg info to store so the whole reorganize process can recover from panic. err = reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool) @@ -3938,14 +4000,26 @@ func (w *worker) reorgPartitionDataAndIndex(t table.Table, reorgInfo *reorgInfo) zap.Int64("jobID", reorgInfo.Job.ID), zap.ByteString("elementType", reorgInfo.currElement.TypeKey), zap.Int64("elementID", reorgInfo.currElement.ID), - zap.Int64("partitionTableId", physTbl.GetPhysicalID()), zap.String("startHandle", hex.EncodeToString(reorgInfo.StartKey)), zap.String("endHandle", hex.EncodeToString(reorgInfo.EndKey))) if err != nil { return errors.Trace(err) } } - return w.addTableIndex(t, reorgInfo) + if _, err = findNextNonTouchedPartitionID(reorgInfo.PhysicalTableID, pi); err == nil { + err = w.addTableIndex(t, reorgInfo) + if err != nil { + return errors.Trace(err) + } + reorgInfo.PhysicalTableID = 0 + err = reorgInfo.UpdateReorgMeta(reorgInfo.StartKey, w.sessPool) + logutil.DDLLogger().Info("Non touched partitions done", + zap.Int64("jobID", reorgInfo.Job.ID), zap.Error(err)) + if err != nil { + return errors.Trace(err) + } + } + return nil } func bundlesForExchangeTablePartition(t *meta.Meta, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) { diff --git a/pkg/ddl/rollingback.go b/pkg/ddl/rollingback.go index 67d4995dd798a..8f95b64cfbf0f 100644 --- a/pkg/ddl/rollingback.go +++ b/pkg/ddl/rollingback.go @@ -338,15 +338,163 @@ func convertAddTablePartitionJob2RollbackJob(jobCtx *jobContext, t *meta.Meta, j if err != nil { return ver, errors.Trace(err) } - if job.Type == model.ActionReorganizePartition || - job.Type == model.ActionAlterTablePartitioning || - job.Type == model.ActionRemovePartitioning { - args.PartNames = partNames - job.FillArgs(args) - } else { - args.PartNames = partNames - model.FillRollbackArgsForAddPartition(job, args) + args.PartNames = partNames + model.FillRollbackArgsForAddPartition(job, args) + /* + _, err = job.Encode(true) + if err != nil { + return ver, errors.Trace(err) + } + + */ + ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) + if err != nil { + return ver, errors.Trace(err) + } + job.State = model.JobStateRollingback + return ver, errors.Trace(otherwiseErr) +} + +func rollbackReorganizePartitionWithErr(jobCtx *jobContext, t *meta.Meta, job *model.Job, otherwiseErr error) (ver int64, err error) { + if job.SchemaState == model.StateNone { + job.State = model.JobStateCancelled + return ver, otherwiseErr } + + tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) + if err != nil { + return ver, errors.Trace(err) + } + + // addingDefinitions is also in tblInfo, here pass the tblInfo as parameter directly. + return convertReorgPartitionJob2RollbackJob(jobCtx, t, job, otherwiseErr, tblInfo) +} + +func convertReorgPartitionJob2RollbackJob(jobCtx *jobContext, t *meta.Meta, job *model.Job, otherwiseErr error, tblInfo *model.TableInfo) (ver int64, err error) { + pi := tblInfo.Partition + addingDefinitions := pi.AddingDefinitions + partNames := make([]string, 0, len(addingDefinitions)) + for _, pd := range addingDefinitions { + partNames = append(partNames, pd.Name.L) + } + var dropIndices []*model.IndexInfo + // When Global Index is duplicated to a non Global, we later need + // to know if if it was Global before (marked to be dropped) or not. + globalToUniqueDupMap := make(map[string]int64) + for _, indexInfo := range tblInfo.Indices { + if !indexInfo.Unique { + continue + } + switch indexInfo.State { + case model.StateWriteReorganization, model.StateDeleteOnly, + model.StateWriteOnly: + dropIndices = append(dropIndices, indexInfo) + case model.StateDeleteReorganization: + if pi.DDLState != model.StateDeleteReorganization { + continue + } + // Old index marked to be dropped, rollback by making it public again + indexInfo.State = model.StatePublic + if indexInfo.Global { + if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok { + return ver, errors.NewNoStackErrorf("Duplicate global index names '%s', %d != %d", indexInfo.Name.O, indexInfo.ID, id) + } + globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID + } + case model.StatePublic: + if pi.DDLState != model.StateDeleteReorganization { + continue + } + // We cannot drop the index here, we need to wait until + // the next schema version + // i.e. rollback in onDropTablePartition + // New index that became public in this state, + // mark it to be dropped in next schema version + if indexInfo.Global { + indexInfo.State = model.StateDeleteReorganization + } else { + // How to know if this index was created as a duplicate or not? + if id, ok := globalToUniqueDupMap[indexInfo.Name.L]; ok { + // The original index + if id >= indexInfo.ID { + return ver, errors.NewNoStackErrorf("Indexes in wrong order during rollback, '%s', %d >= %d", indexInfo.Name.O, id, indexInfo.ID) + } + indexInfo.State = model.StateDeleteReorganization + } else { + globalToUniqueDupMap[indexInfo.Name.L] = indexInfo.ID + } + } + } + } + for _, indexInfo := range dropIndices { + DropIndexColumnFlag(tblInfo, indexInfo) + RemoveDependentHiddenColumns(tblInfo, indexInfo) + removeIndexInfo(tblInfo, indexInfo) + } + if pi.DDLState == model.StateDeleteReorganization { + // New partitions are public, + // but old is still double written. + // OK to revert. + // Remove the AddingDefinitions + // Add back the DroppingDefinitions + if job.Type == model.ActionReorganizePartition { + // Reassemble the list of partitions in the OriginalPartitionIDsOrder + // Special handling, since for LIST partitioning, + // only pi.OriginalPartitionIDsOrder shows how to merge back the DroppingDefinitions. + // Implicitly it will also filter away AddingPartitions. + // pi.Definitions and pi.DroppingDefinitions contain the original partitions + // in the original order, but where the DroppingDefinitions should be placed, + // can only be known through pi.OriginalPartitionIDsOrder. + // RANGE/HASH/KEY would have consecutive added/dropped partitions, but use + // the same code to avoid confusion. + defPos := 0 + dropPos := 0 + newDefs := make([]model.PartitionDefinition, 0, len(pi.OriginalPartitionIDsOrder)) + for _, id := range pi.OriginalPartitionIDsOrder { + if defPos < len(pi.Definitions) && pi.Definitions[defPos].ID == id { + newDefs = append(newDefs, pi.Definitions[defPos]) + defPos++ + continue + } + if dropPos < len(pi.DroppingDefinitions) && id == pi.DroppingDefinitions[dropPos].ID { + newDefs = append(newDefs, pi.DroppingDefinitions[dropPos]) + dropPos++ + continue + } + for { + defPos++ + if defPos < len(pi.Definitions) && pi.Definitions[defPos].ID == id { + newDefs = append(newDefs, pi.Definitions[defPos]) + break + } + } + } + if len(newDefs) != len(pi.OriginalPartitionIDsOrder) { + return ver, errors.Trace(errors.New("Internal error, failed to find original partition definitions")) + } + pi.Definitions = newDefs + pi.Num = uint64(len(pi.Definitions)) + } else { + pi.Type, pi.DDLType = pi.DDLType, pi.Type + pi.Expr, pi.DDLExpr = pi.DDLExpr, pi.Expr + pi.Columns, pi.DDLColumns = pi.DDLColumns, pi.Columns + pi.Definitions = pi.DroppingDefinitions + } + } + + args, err := model.GetTablePartitionArgs(job) + if err != nil { + return ver, errors.Trace(err) + } + args.PartNames = partNames + job.FillArgs(args) + /* + _, err = job.Encode(true) + if err != nil { + return ver, errors.Trace(err) + } + + */ ver, err = updateVersionAndTableInfo(jobCtx, t, job, tblInfo, true) if err != nil { return ver, errors.Trace(err) @@ -442,22 +590,6 @@ func rollingbackTruncateTable(t *meta.Meta, job *model.Job) (ver int64, err erro return cancelOnlyNotHandledJob(job, model.StateNone) } -func rollingbackReorganizePartition(jobCtx *jobContext, t *meta.Meta, job *model.Job) (ver int64, err error) { - if job.SchemaState == model.StateNone { - job.State = model.JobStateCancelled - return ver, dbterror.ErrCancelledDDLJob - } - - tblInfo, err := GetTableInfoAndCancelFaultJob(t, job, job.SchemaID) - if err != nil { - return ver, errors.Trace(err) - } - - // addingDefinitions is also in tblInfo, here pass the tblInfo as parameter directly. - // TODO: Test this with reorganize partition p1 into (partition p1 ...)! - return convertAddTablePartitionJob2RollbackJob(jobCtx, t, job, dbterror.ErrCancelledDDLJob, tblInfo) -} - func pauseReorgWorkers(w *worker, d *ddlCtx, job *model.Job) (err error) { if needNotifyAndStopReorgWorker(job) { w.jobLogger(job).Info("pausing the DDL job", zap.String("job", job.String())) @@ -479,7 +611,7 @@ func convertJob2RollbackJob(w *worker, jobCtx *jobContext, t *meta.Meta, job *mo ver, err = rollingbackAddTablePartition(jobCtx, t, job) case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning: - ver, err = rollingbackReorganizePartition(jobCtx, t, job) + ver, err = rollbackReorganizePartitionWithErr(jobCtx, t, job, dbterror.ErrCancelledDDLJob) case model.ActionDropColumn: ver, err = rollingbackDropColumn(jobCtx, t, job) case model.ActionDropIndex, model.ActionDropPrimaryKey: diff --git a/pkg/ddl/tests/partition/BUILD.bazel b/pkg/ddl/tests/partition/BUILD.bazel index 9721f14bd2223..e4df09b2b22f0 100644 --- a/pkg/ddl/tests/partition/BUILD.bazel +++ b/pkg/ddl/tests/partition/BUILD.bazel @@ -7,6 +7,7 @@ go_test( "db_partition_test.go", "main_test.go", "placement_test.go", + "reorg_partition_test.go", ], flaky = True, shard_count = 50, @@ -38,6 +39,7 @@ go_test( "//pkg/types", "//pkg/util/codec", "//pkg/util/dbterror", + "//pkg/util/mathutil", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_stretchr_testify//assert", diff --git a/pkg/ddl/reorg_partition_test.go b/pkg/ddl/tests/partition/reorg_partition_test.go similarity index 67% rename from pkg/ddl/reorg_partition_test.go rename to pkg/ddl/tests/partition/reorg_partition_test.go index d386528c08c78..ca5f8e07e3493 100644 --- a/pkg/ddl/reorg_partition_test.go +++ b/pkg/ddl/tests/partition/reorg_partition_test.go @@ -12,16 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package ddl_test +package partition import ( - "bytes" "context" "encoding/hex" "fmt" + "strconv" "testing" - "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/errno" @@ -32,6 +32,7 @@ import ( "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/testkit" + "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/mathutil" @@ -51,8 +52,8 @@ type allTableData struct { // Checks that there are no accessible data after an existing table // assumes that tableIDs are only increasing. // To be used during failure testing of ALTER, to make sure cleanup is done. -func noNewTablesAfter(t *testing.T, tk *testkit.TestKit, ctx sessionctx.Context, tbl table.Table) { - waitForGC := tk.MustQuery(`select start_key, end_key from mysql.gc_delete_range`).Rows() +func noNewTablesAfter(t *testing.T, tk *testkit.TestKit, ctx sessionctx.Context, tbl table.Table, msg string) { + waitForGC := tk.MustQuery(`select start_key, end_key from mysql.gc_delete_range union all select start_key, end_key from mysql.gc_delete_range_done`).Rows() require.NoError(t, sessiontxn.NewTxn(context.Background(), ctx)) txn, err := ctx.Txn(true) require.NoError(t, err) @@ -73,27 +74,43 @@ func noNewTablesAfter(t *testing.T, tk *testkit.TestKit, ctx sessionctx.Context, prefix := tablecodec.EncodeTablePrefix(tblID + 1) it, err := txn.Iter(prefix, nil) require.NoError(t, err) + for _, rowGC := range waitForGC { + logutil.DDLLogger().Info("GC", + zap.String("start", fmt.Sprintf("%v", rowGC[0])), + zap.String("end", fmt.Sprintf("%v", rowGC[1]))) + } ROW: for it.Valid() { for _, rowGC := range waitForGC { // OK if queued for range delete / GC - hexString := fmt.Sprintf("%v", rowGC[0]) - start, err := hex.DecodeString(hexString) - require.NoError(t, err) - hexString = fmt.Sprintf("%v", rowGC[1]) - end, err := hex.DecodeString(hexString) + startHex := fmt.Sprintf("%v", rowGC[0]) + endHex := fmt.Sprintf("%v", rowGC[1]) + end, err := hex.DecodeString(endHex) require.NoError(t, err) - if bytes.Compare(start, it.Key()) >= 0 && bytes.Compare(it.Key(), end) < 0 { + keyHex := hex.EncodeToString(it.Key()) + if startHex <= keyHex && keyHex < endHex { it.Close() it, err = txn.Iter(end, nil) require.NoError(t, err) continue ROW } + logutil.DDLLogger().Info("not found in GC", + zap.String("key", keyHex), + zap.String("start", startHex), + zap.String("end", endHex)) } foundTblID := tablecodec.DecodeTableID(it.Key()) // There are internal table ids starting from MaxInt48 -1 and allocating decreasing ids // Allow 0xFF of them, See JobTableID, ReorgTableID, HistoryTableID, MDLTableID - require.False(t, it.Key()[0] == 't' && foundTblID < 0xFFFFFFFFFF00, "Found table data after highest physical Table ID %d < %d", tblID, foundTblID) + if it.Key()[0] == 't' && foundTblID < 0xFFFFFFFFFF00 { + is := sessiontxn.GetTxnManager(tk.Session()).GetTxnInfoSchema() + tbl, found := is.TableByID(context.Background(), foundTblID) + tblmsg := " Table ID no longer maps to a table" + if found { + tblmsg = fmt.Sprintf(" Table name: %s", tbl.Meta().Name.O) + } + require.False(t, true, "Found table data after highest physical Table ID %d < %d (%s) "+msg+tblmsg, tblID, foundTblID, it.Key()) + } break } } @@ -142,6 +159,264 @@ func getAllDataForPhysicalTable(t *testing.T, ctx sessionctx.Context, physTable return all } +func TestReorgPartitionFailures(t *testing.T) { + create := `create table t (a int unsigned PRIMARY KEY, b varchar(255), c int, key (b), key (c,b))` + + ` partition by range (a) ` + + `(partition p0 values less than (10),` + + ` partition p1 values less than (20),` + + ` partition p2 values less than (30),` + + ` partition pMax values less than (MAXVALUE))` + alter := "alter table t reorganize partition p1,p2 into (partition p1 values less than (17), partition p1b values less than (24), partition p2 values less than (30))" + beforeDML := []string{ + `insert into t values (1,"1",1),(2,"2",2),(12,"12",21),(13,"13",13),(17,"17",17),(18,"18",18),(23,"23",32),(34,"34",43),(45,"45",54),(56,"56",65)`, + `update t set a = 11, b = "11", c = 11 where a = 17`, + `update t set b = "21", c = 12 where c = 12`, + `delete from t where a = 13`, + `delete from t where b = "56"`, + } + beforeResult := testkit.Rows( + "1 1 1", "11 11 11", "12 12 21", "18 18 18", "2 2 2", "23 23 32", "34 34 43", "45 45 54", + ) + afterDML := []string{ + `insert into t values (5,"5",5),(13,"13",13)`, + `update t set a = 17, b = "17", c = 17 where a = 11`, + `update t set b = "12", c = 21 where c = 12`, + `delete from t where a = 34`, + `delete from t where b = "56"`, + } + afterResult := testkit.Rows( + "1 1 1", "12 12 21", "13 13 13", "17 17 17", "18 18 18", "2 2 2", "23 23 32", "45 45 54", "5 5 5", + ) + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") +} + +func TestRemovePartitionFailures(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( + partition p0 values less than (100), + partition p1 values less than (200))` + alter := `alter table t remove partitioning` + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(3,3,3),(101,101,101),(102,102,102),(103,103,103)`, + `update t set a = 11, b = "11", c = 11 where a = 1`, + `update t set b = "12", c = 12 where b = 2`, + `delete from t where a = 102`, + `delete from t where b = 103`, + } + beforeResult := testkit.Rows("101 101 101", "11 11 11", "2 12 12", "3 3 3") + afterDML := []string{ + `insert into t values (4,4,4),(5,5,5),(104,104,104)`, + `update t set a = 1, b = 1, c = 1 where a = 11`, + `update t set b = 2, c = 2 where c = 12`, + `update t set a = 9, b = 9 where a = 104`, + `delete from t where a = 5`, + `delete from t where b = 102`, + } + afterResult := testkit.Rows("1 1 1", "101 101 101", "2 2 2", "3 3 3", "4 4 4", "9 9 104") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") +} + +func TestPartitionByFailures(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered, b int not null, c varchar(255)) partition by range(a) ( + partition p0 values less than (100), + partition p1 values less than (200))` + alter := "alter table t partition by range (b) (partition pNoneC values less than (150), partition p2 values less than (300)) update indexes (`primary` global)" + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(3,3,3),(101,101,101),(102,102,102),(103,103,103)`, + `update t set a = 11, b = "11", c = 11 where a = 1`, + `update t set b = "12", c = 12 where b = 2`, + `delete from t where a = 102`, + `delete from t where b = 103`, + } + beforeResult := testkit.Rows("101 101 101", "11 11 11", "2 12 12", "3 3 3") + afterDML := []string{ + `insert into t values (4,4,4),(5,5,5),(104,104,104)`, + `update t set a = 1, b = 1, c = 1 where a = 11`, + `update t set b = 2, c = 2 where c = 12`, + `update t set a = 9, b = 9 where a = 104`, + `delete from t where a = 5`, + `delete from t where b = 102`, + } + afterResult := testkit.Rows("1 1 1", "101 101 101", "2 2 2", "3 3 3", "4 4 4", "9 9 104") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) +} + +func TestReorganizePartitionListFailures(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique index (c) global) partition by list(b) ( + partition p0 values in (1,2,3), + partition p1 values in (4,5,6), + partition p2 values in (7,8,9))` + alter := `alter table t reorganize partition p0,p2 into (partition pNone1 values in (1,9), partition pNone2 values in (2,8), partition pNone3 values in (3,7))` + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + `update t set a = 7, b = 7, c = 7 where a = 1`, + `update t set b = 3, c = 3 where c = 4`, + `delete from t where a = 8`, + `delete from t where b = 2`, + } + beforeResult := testkit.Rows("4 3 3", "6 6 6", "7 7 7", "9 9 9") + afterDML := []string{ + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + `update t set a = 2, b = 2, c = 2 where a = 1`, + `update t set a = 1, b = 1, c = 1 where c = 6`, + `update t set a = 6, b = 6 where a = 9`, + `delete from t where a = 5`, + `delete from t where b = 3`, + } + afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") +} + +func TestPartitionByListFailures(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique index (b), unique index (c) global) partition by list(b) ( + partition p0 values in (1,2,3,4,5,6), + partition p1 values in (11,10,9,8,7))` + alter := `alter table t partition by list columns (c) (partition pNone1 values in (1,11,3,5,7,9), partition pNone2 values in (2,4,8,10,6)) update indexes (b global, c local)` + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + `update t set a = 7, b = 7, c = 7 where a = 1`, + `update t set b = 3, c = 3 where c = "4"`, + `delete from t where a = 8`, + `delete from t where b = 2`, + } + beforeResult := testkit.Rows("4 3 3", "6 6 6", "7 7 7", "9 9 9") + afterDML := []string{ + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + `update t set a = 2, b = 2, c = 2 where a = 1`, + `update t set a = 1, b = 1, c = 1 where c = "6"`, + `update t set a = 6, b = 6 where a = 9`, + `delete from t where a = 5`, + `delete from t where b = 3`, + } + afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult) +} + +func TestAddHashPartitionFailures(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique index (c) global) partition by hash(b) partitions 3` + alter := `alter table t add partition partitions 2` + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + `update t set a = 7, b = 7, c = 7 where a = 1`, + `update t set b = 3, c = 3 where c = "4"`, + `delete from t where a = 8`, + `delete from t where b = 2`, + } + beforeResult := testkit.Rows("4 3 3", "6 6 6", "7 7 7", "9 9 9") + afterDML := []string{ + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + `update t set a = 2, b = 2, c = 2 where a = 1`, + `update t set a = 1, b = 1, c = 1 where c = "6"`, + `update t set a = 6, b = 6 where a = 9`, + `delete from t where a = 5`, + `delete from t where b = 3`, + } + afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") +} + +func TestCoalesceKeyPartitionFailures(t *testing.T) { + create := `create table t (a int unsigned primary key nonclustered global, b int not null, c varchar(255), unique index (b) global, unique index (c)) partition by key(c) partitions 5` + alter := `alter table t coalesce partition 2` + beforeDML := []string{ + `insert into t values (1,1,1),(2,2,2),(4,4,4),(8,8,8),(9,9,9),(6,6,6)`, + `update t set a = 7, b = 7, c = 7 where a = 1`, + `update t set b = 3, c = 3 where c = "4"`, + `delete from t where a = 8`, + `delete from t where b = 2`, + } + beforeResult := testkit.Rows("4 3 3", "6 6 6", "7 7 7", "9 9 9") + afterDML := []string{ + `insert into t values (1,1,1),(5,5,5),(8,8,8)`, + `update t set a = 2, b = 2, c = 2 where a = 1`, + `update t set a = 1, b = 1, c = 1 where c = "6"`, + `update t set a = 6, b = 6 where a = 9`, + `delete from t where a = 5`, + `delete from t where b = 3`, + } + afterResult := testkit.Rows("1 1 1", "2 2 2", "6 6 9", "7 7 7", "8 8 8") + testReorganizePartitionFailures(t, create, alter, beforeDML, beforeResult, afterDML, afterResult, "Fail4") +} + +func testReorganizePartitionFailures(t *testing.T, createSQL, alterSQL string, beforeDML []string, beforeResult [][]any, afterDML []string, afterResult [][]any, skipTests ...string) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set tidb_enable_global_index=true") + defer func() { + tk.MustExec("set tidb_enable_global_index=default") + }() + // Fail means we simply inject an error, and set the error count very high to see what happens + // we do expect to do best effort rollback here as well! + // Cancel means we set job.State = JobStateCancelled, as in no need to do more + // Rollback means we do full rollback before returning error. + tests := []struct { + name string + count int + }{ + { + "Cancel", + 1, + }, + { + "Fail", + 5, + }, + { + "Rollback", + 4, + }, + } + oldWaitTimeWhenErrorOccurred := ddl.WaitTimeWhenErrorOccurred + defer func() { + ddl.WaitTimeWhenErrorOccurred = oldWaitTimeWhenErrorOccurred + }() + ddl.WaitTimeWhenErrorOccurred = 0 + for _, test := range tests { + SUBTEST: + for i := 1; i <= test.count; i++ { + suffix := test.name + strconv.Itoa(i) + for _, skip := range skipTests { + if suffix == skip { + continue SUBTEST + } + } + tk.MustExec(createSQL) + for _, sql := range beforeDML { + tk.MustExec(sql + ` /* ` + suffix + ` */`) + } + tk.MustQuery(`select * from t /* ` + suffix + ` */`).Sort().Check(beforeResult) + tOrg := external.GetTableByName(t, tk, "test", "t") + idxID := tOrg.Meta().Indices[0].ID + oldCreate := tk.MustQuery(`show create table t`).Rows() + name := "github.com/pingcap/tidb/pkg/ddl/reorgPart" + suffix + testfailpoint.Enable(t, name, `return(true)`) + err := tk.ExecToErr(alterSQL) + require.Error(t, err, "failpoint reorgPart"+suffix) + require.ErrorContains(t, err, "Injected error by reorgPart"+suffix) + testfailpoint.Disable(t, name) + tk.MustQuery(`show create table t /* ` + suffix + ` */`).Check(oldCreate) + tt := external.GetTableByName(t, tk, "test", "t") + partition := tt.Meta().Partition + require.Equal(t, len(tOrg.Meta().Partition.Definitions), len(partition.Definitions), suffix) + require.Equal(t, 0, len(partition.AddingDefinitions), suffix) + require.Equal(t, 0, len(partition.DroppingDefinitions), suffix) + require.Equal(t, len(tOrg.Meta().Indices), len(tt.Meta().Indices), suffix) + require.Equal(t, idxID, tt.Meta().Indices[0].ID, suffix) + noNewTablesAfter(t, tk, tk.Session(), tOrg, suffix) + tk.MustExec(`admin check table t /* ` + suffix + ` */`) + for _, sql := range afterDML { + tk.MustExec(sql + " /* " + suffix + " */") + } + tk.MustQuery(`select * from t /* ` + suffix + ` */`).Sort().Check(afterResult) + tk.MustExec(`drop table t /* ` + suffix + ` */`) + // TODO: Check TiFlash replicas + // TODO: Check Label rules + // TODO: Check bundles + // TODO: Check autoIDs + } + } +} + func TestReorgPartitionConcurrent(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -525,19 +800,18 @@ func TestReorgPartitionRollback(t *testing.T) { // TODO: Check that there are no additional placement rules, // bundles, or ranges with non-completed tableIDs // (partitions used during reorg, but was dropped) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr", `return(true)`)) + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr", `return(true)`) tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))") tk.MustExec(`admin check table t`) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr")) + testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/mockUpdateVersionAndTableInfoErr") ctx := tk.Session() is := domain.GetDomain(ctx).InfoSchema() tbl, err := is.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) require.NoError(t, err) - noNewTablesAfter(t, tk, ctx, tbl) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/reorgPartitionAfterDataCopy", `return(true)`)) + noNewTablesAfter(t, tk, ctx, tbl, "Reorganize rollback") + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/reorgPartitionAfterDataCopy", `return(true)`) defer func() { - err := failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/reorgPartitionAfterDataCopy") - require.NoError(t, err) + testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/reorgPartitionAfterDataCopy") }() tk.MustExecToErr("alter table t reorganize partition p1 into (partition p1a values less than (15), partition p1b values less than (20))") tk.MustExec(`admin check table t`) @@ -557,7 +831,7 @@ func TestReorgPartitionRollback(t *testing.T) { tbl, err = is.TableByName(context.Background(), pmodel.NewCIStr(schemaName), pmodel.NewCIStr("t")) require.NoError(t, err) - noNewTablesAfter(t, tk, ctx, tbl) + noNewTablesAfter(t, tk, ctx, tbl, "Reorganize rollback") } func TestPartitionByColumnChecks(t *testing.T) { diff --git a/pkg/meta/model/table.go b/pkg/meta/model/table.go index 6e6e417863f9d..970a03e4d5164 100644 --- a/pkg/meta/model/table.go +++ b/pkg/meta/model/table.go @@ -709,7 +709,10 @@ type PartitionInfo struct { // DroppingDefinitions is filled when dropping/truncating partitions that is in the mid state. DroppingDefinitions []PartitionDefinition `json:"dropping_definitions"` // NewPartitionIDs is filled when truncating partitions that is in the mid state. - NewPartitionIDs []int64 + NewPartitionIDs []int64 `json:"new_partition_ids,omitempty"` + // OriginalPartitionIDsOrder is only needed for rollback of Reorganize Partition for + // LIST partitions, since in StateDeleteReorganize we don't know the old order any longer. + OriginalPartitionIDsOrder []int64 `json:"original_partition_ids_order,omitempty"` States []PartitionState `json:"states"` Num uint64 `json:"num"` @@ -854,6 +857,16 @@ func (pi *PartitionInfo) GetPartitionIDByName(partitionDefinitionName string) in return -1 } +// SetOriginalPartitionIDs sets the order of the original partition IDs +// in case it needs to be rolled back. LIST Partitioning would not know otherwise. +func (pi *PartitionInfo) SetOriginalPartitionIDs() { + ids := make([]int64, 0, len(pi.Definitions)) + for _, def := range pi.Definitions { + ids = append(ids, def.ID) + } + pi.OriginalPartitionIDsOrder = ids +} + // PartitionState is the state of the partition. type PartitionState struct { ID int64 `json:"id"`