Skip to content

Commit

Permalink
ddl: Reorganize partition supporting global index (#53277)
Browse files Browse the repository at this point in the history
ref #45133
  • Loading branch information
mjonss authored Jun 4, 2024
1 parent 8277203 commit be86a25
Show file tree
Hide file tree
Showing 11 changed files with 776 additions and 96 deletions.
26 changes: 19 additions & 7 deletions pkg/ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4649,18 +4649,27 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp
return err
}
if !ck {
if ctx.GetSessionVars().EnableGlobalIndex {
return dbterror.ErrCancelledDDLJob.GenWithStack("global index is not supported yet for alter table partitioning")
indexTp := ""
if !ctx.GetSessionVars().EnableGlobalIndex {
if index.Primary {
indexTp = "PRIMARY KEY"
} else {
indexTp = "UNIQUE INDEX"
}
} else if t.Meta().IsCommonHandle {
indexTp = "CLUSTERED INDEX"
}
indexTp := "UNIQUE INDEX"
if index.Primary {
indexTp = "PRIMARY"
if indexTp != "" {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs(indexTp)
}
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs(indexTp)
// Also mark the unique index as global index
index.Global = true
}
}
}
if newMeta.PKIsHandle {
// This case is covers when the Handle is the PK (only ints), since it would not
// have an entry in the tblInfo.Indices
indexCols := []*model.IndexColumn{{
Name: newMeta.GetPkName(),
Length: types.UnspecifiedLength,
Expand All @@ -4670,7 +4679,10 @@ func (d *ddl) AlterTablePartitioning(ctx sessionctx.Context, ident ast.Ident, sp
return err
}
if !ck {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("PRIMARY")
if !ctx.GetSessionVars().EnableGlobalIndex {
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("PRIMARY KEY")
}
return dbterror.ErrUniqueKeyNeedAllFieldsInPf.GenWithStackByArgs("CLUSTERED INDEX")
}
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,10 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
model.ActionReorganizePartition, model.ActionRemovePartitioning,
model.ActionAlterTablePartitioning:
var physicalTableIDs []int64
// partInfo is not used, but is set in ReorgPartition.
// Better to have an additional argument in job.DecodeArgs since it is ignored,
// instead of having one to few, which will remove the data from the job arguments...
var partInfo model.PartitionInfo
if err := job.DecodeArgs(&physicalTableIDs, &partInfo); err != nil {
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "drop partition: physical table ID(s)"))
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, physicalTableIDs, ea, "reorganize/drop partition: physical table ID(s)"))
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
allIndexIDs := make([]int64, 1)
Expand Down
55 changes: 47 additions & 8 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1903,6 +1903,8 @@ func (w *addIndexTxnWorker) BackfillData(handleRange reorgBackfillTask) (taskCtx

// We need to add this lock to make sure pessimistic transaction can realize this operation.
// For the normal pessimistic transaction, it's ok. But if async commit is used, it may lead to inconsistent data and index.
// TODO: For global index, lock the correct key?! Currently it locks the partition (phyTblID) and the handle or actual key?
// but should really lock the table's ID + key col(s)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), idxRecord.key)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -2292,16 +2294,35 @@ func getNextPartitionInfo(reorg *reorgInfo, t table.PartitionedTable, currPhysic
return 0, nil, nil, nil
}

// During data copying, copy data from partitions to be dropped
nextPartitionDefs := pi.DroppingDefinitions
// This will be used in multiple different scenarios/ALTER TABLE:
// ADD INDEX - no change in partitions, just use pi.Definitions (1)
// REORGANIZE PARTITION - copy data from partitions to be dropped (2)
// REORGANIZE PARTITION - (re)create indexes on partitions to be added (3)
// REORGANIZE PARTITION - Update new Global indexes with data from non-touched partitions (4)
// (i.e. pi.Definitions - pi.DroppingDefinitions)
var pid int64
var err error
if bytes.Equal(reorg.currElement.TypeKey, meta.IndexElementKey) {
// During index re-creation, process data from partitions to be added
nextPartitionDefs = pi.AddingDefinitions
}
if len(nextPartitionDefs) == 0 {
nextPartitionDefs = pi.Definitions
// case 1, 3 or 4
if len(pi.AddingDefinitions) == 0 {
// case 1
// Simply AddIndex, without any partitions added or dropped!
pid, err = findNextPartitionID(currPhysicalTableID, pi.Definitions)
} else {
// case 3 (or if not found AddingDefinitions; 4)
// check if recreating Global Index (during Reorg Partition)
pid, err = findNextPartitionID(currPhysicalTableID, pi.AddingDefinitions)
if err != nil {
// case 4
// Not a partition in the AddingDefinitions, so it must be an existing
// non-touched partition, i.e. recreating Global Index for the non-touched partitions
pid, err = findNextNonTouchedPartitionID(currPhysicalTableID, pi)
}
}
} else {
// case 2
pid, err = findNextPartitionID(currPhysicalTableID, pi.DroppingDefinitions)
}
pid, err := findNextPartitionID(currPhysicalTableID, nextPartitionDefs)
if err != nil {
// Fatal error, should not run here.
logutil.DDLLogger().Error("find next partition ID failed", zap.Reflect("table", t), zap.Error(err))
Expand Down Expand Up @@ -2382,6 +2403,24 @@ func findNextPartitionID(currentPartition int64, defs []model.PartitionDefinitio
return 0, errors.Errorf("partition id not found %d", currentPartition)
}

func findNextNonTouchedPartitionID(currPartitionID int64, pi *model.PartitionInfo) (int64, error) {
pid, err := findNextPartitionID(currPartitionID, pi.Definitions)
if err != nil {
return 0, err
}
if pid == 0 {
return 0, nil
}
for _, notFoundErr := findNextPartitionID(pid, pi.DroppingDefinitions); notFoundErr == nil; {
// This can be optimized, but it is not frequently called, so keeping as-is
pid, err = findNextPartitionID(pid, pi.Definitions)
if pid == 0 {
break
}
}
return pid, err
}

// AllocateIndexID allocates an index ID from TableInfo.
func AllocateIndexID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxIndexID++
Expand Down
Loading

0 comments on commit be86a25

Please sign in to comment.