Skip to content

Commit

Permalink
*: Allow Point_Get during DDL with Global Index (#56382) (#56849)
Browse files Browse the repository at this point in the history
ref #45133, ref #55819, close #56819
  • Loading branch information
ti-chi-bot authored Oct 27, 2024
1 parent 27e78ae commit 2511e92
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 47 deletions.
50 changes: 31 additions & 19 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in

// move the adding definition into tableInfo.
updateAddingPartitionInfo(partInfo, tblInfo)
tblInfo.Partition.DDLState = model.StateReplicaOnly
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -222,6 +224,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in

preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions)

tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -2244,9 +2248,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}

var physicalTableIDs []int64
// In order to skip maintaining the state check in partitionDefinition, TiDB use droppingDefinition instead of state field.
// So here using `job.SchemaState` to judge what the stage of this job is.
originalState := job.SchemaState
switch job.SchemaState {
case model.StatePublic:
// Here we mark the partitions to be dropped, so they are not read or written
Expand All @@ -2260,11 +2261,11 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
originalDefs := tblInfo.Partition.Definitions
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
tblInfo.Partition.Definitions = originalDefs
tblInfo.Partition.DDLState = model.StateWriteOnly
tblInfo.Partition.DDLAction = model.ActionDropTablePartition

job.SchemaState = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type

ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// Since the previous state do not use the dropping partitions,
// we can now actually remove them, allowing to write into the overlapping range
Expand Down Expand Up @@ -2308,16 +2309,16 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
return ver, err
}

tblInfo.Partition.DDLState = model.StateDeleteOnly
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintain the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
tblInfo.Partition.DDLState = model.StateDeleteReorganization
job.SchemaState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteReorganization:
oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo)
physicalTableIDs = getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
Expand Down Expand Up @@ -2375,7 +2376,8 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}
droppedDefs := tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
job.SchemaState = model.StateNone
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = model.ActionNone
// used by ApplyDiff in updateSchemaVersion
job.CtxVars = []any{physicalTableIDs} // TODO remove it.
Expand Down Expand Up @@ -2511,14 +2513,16 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
pi.DroppingDefinitions = truncatingDefinitions
pi.NewPartitionIDs = newIDs[:]

tblInfo.Partition.DDLAction = model.ActionTruncateTablePartition
job.SchemaState = model.StateDeleteOnly
pi.DDLState = job.SchemaState
pi.DDLAction = job.Type
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
job.SchemaState = model.StateDeleteReorganization
pi.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteReorganization:
// Step2: clear global index rows.
Expand Down Expand Up @@ -2609,6 +2613,8 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
// Step4: clear DroppingDefinitions and finish job.
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.NewPartitionIDs = nil
tblInfo.Partition.DDLAction = model.ActionNone
tblInfo.Partition.DDLState = model.StateNone

preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)

Expand Down Expand Up @@ -2816,6 +2822,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
// into the table using the schema version
// before the exchange is made.
job.SchemaState = model.StateWriteOnly
pt.Partition.DDLState = job.SchemaState
pt.Partition.DDLAction = job.Type
return updateVersionAndTableInfoWithCheck(jobCtx, job, nt, true, ptInfo...)
}
// From now on, nt (the non-partitioned table) has
Expand Down Expand Up @@ -2890,6 +2898,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
originalPartitionDef := partDef.Clone()
originalNt := nt.Clone()
partDef.ID, nt.ID = nt.ID, partDef.ID
pt.Partition.DDLState = model.StateNone
pt.Partition.DDLAction = model.ActionNone

err = metaMut.UpdateTable(ptSchemaID, pt)
if err != nil {
Expand Down Expand Up @@ -3324,7 +3334,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that.
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
Expand Down Expand Up @@ -3394,8 +3404,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, job, err))
}
})
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateWriteOnly
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// Insert this state to confirm all servers can see the new partitions when reorg is running,
// so that new data will be updated in both old and new partitions when reorganizing.
Expand All @@ -3405,10 +3416,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
tblInfo.Indices[i].State = model.StateWriteReorganization
}
}
tblInfo.Partition.DDLState = model.StateWriteReorganization
job.SchemaState = model.StateWriteReorganization
tblInfo.Partition.DDLState = job.SchemaState
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
tbl, err2 := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
Expand Down Expand Up @@ -3493,9 +3504,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// Now all the data copying is done, but we cannot simply remove the droppingDefinitions
// since they are a part of the normal Definitions that other nodes with
// the current schema version. So we need to double write for one more schema version
tblInfo.Partition.DDLState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateDeleteReorganization
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)

case model.StateDeleteReorganization:
// Drop the droppingDefinitions and finish the DDL
Expand All @@ -3517,6 +3528,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.AddingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
tblInfo.Partition.OriginalPartitionIDsOrder = nil

var dropIndices []*model.IndexInfo
Expand Down
6 changes: 2 additions & 4 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,10 +1433,8 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec(`begin`)
tk3.MustExec(`use test`)
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Selection")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Selection")
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Point_Get")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get")
tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
// Here it will fail with
Expand Down
22 changes: 22 additions & 0 deletions pkg/ddl/tests/partition/multi_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ import (
"go.uber.org/zap"
)

func TestMultiSchemaReorganizePartitionIssue56819(t *testing.T) {
createSQL := `create table t (a int primary key, b varchar(255), unique index idx_b_global (b) global) partition by range (a) (partition p1 values less than (200), partition pMax values less than (maxvalue))`
initFn := func(tkO *testkit.TestKit) {
tkO.MustExec(`insert into t values (1,1),(2,2)`)
}
alterSQL := `alter table t reorganize partition p1 into (partition p0 values less than (100), partition p1 values less than (200))`
loopFn := func(tkO, tkNO *testkit.TestKit) {
res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`)
schemaState := res.Rows()[0][0].(string)
switch schemaState {
case model.StateDeleteOnly.String():
tkNO.MustExec(`insert into t values (4,4)`)
tkNO.MustQuery(`select * from t where b = "4"`).Sort().Check(testkit.Rows("4 4"))
tkO.MustQuery(`select * from t where b = "4"`).Sort().Check(testkit.Rows("4 4"))
}
}
postFn := func(tkO *testkit.TestKit) {
// nothing
}
runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn)
}

func TestMultiSchemaDropRangePartition(t *testing.T) {
createSQL := `create table t (a int primary key, b varchar(255)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200))`
initFn := func(tkO *testkit.TestKit) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,15 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
tblID = pid
if !matchPartitionNames(tblID, e.partitionNames, e.tblInfo.GetPartitionInfo()) {
pi := e.tblInfo.GetPartitionInfo()
if !matchPartitionNames(tblID, e.partitionNames, pi) {
return nil
}
for _, id := range pi.IDsInDDLToIgnore() {
if id == pid {
return nil
}
}
}
}
}
Expand Down
65 changes: 64 additions & 1 deletion pkg/meta/model/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,69 @@ func (pi *PartitionInfo) SetOriginalPartitionIDs() {
pi.OriginalPartitionIDsOrder = ids
}

// IDsInDDLToIgnore returns a list of IDs that the current
// session should not see (may be duplicate errors on insert/update though)
// For example during truncate or drop partition.
func (pi *PartitionInfo) IDsInDDLToIgnore() []int64 {
// TODO:
// Truncate partition:
// write only => should not see NewPartitionIDs
// delete only => should not see DroppingPartitions
// Drop partition:
// TODO: Make similar changes as in Truncate Partition:
// Add a state blocking read and write in the partitions to be dropped,
// to avoid situations like https://github.com/pingcap/tidb/issues/55888
// Add partition:
// TODO: Add tests!
// Exchange Partition:
// Currently blocked for GlobalIndex
// Reorganize Partition:
// Nothing, since it will create a new copy of the global index.
// This is due to the global index needs to have two partitions for the same index key
// TODO: Should we extend the GlobalIndex to have multiple partitions?
// Maybe from PK/_tidb_rowid + Partition ID
// to PK/_tidb_rowid + Partition ID + Valid from Schema Version,
// with support for two entries?
// Then we could avoid having two copies of the same Global Index
// just for handling a single SchemaState.
// If so, could we then replace this?
switch pi.DDLAction {
case ActionTruncateTablePartition:
switch pi.DDLState {
case StateWriteOnly:
return pi.NewPartitionIDs
case StateDeleteOnly, StateDeleteReorganization:
if len(pi.DroppingDefinitions) == 0 {
return nil
}
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, definition := range pi.DroppingDefinitions {
ids = append(ids, definition.ID)
}
return ids
}
case ActionDropTablePartition:
if len(pi.DroppingDefinitions) > 0 && pi.DDLState == StateDeleteOnly {
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, def := range pi.DroppingDefinitions {
ids = append(ids, def.ID)
}
return ids
}
case ActionAddTablePartition:
// TODO: Add tests for ADD PARTITION multi-domain with Global Index!
if len(pi.AddingDefinitions) > 0 {
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, def := range pi.AddingDefinitions {
ids = append(ids, def.ID)
}
return ids
}
// Not supporting Global Indexes: case ActionExchangeTablePartition
}
return nil
}

// PartitionState is the state of the partition.
type PartitionState struct {
ID int64 `json:"id"`
Expand All @@ -1023,7 +1086,7 @@ type PartitionDefinition struct {
Comment string `json:"comment,omitempty"`
}

// Clone clones ConstraintInfo.
// Clone clones PartitionDefinition.
func (ci *PartitionDefinition) Clone() PartitionDefinition {
nci := *ci
nci.LessThan = make([]string, len(ci.LessThan))
Expand Down
43 changes: 21 additions & 22 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,16 +1480,6 @@ func findBestTask4LogicalDataSource(lp base.LogicalPlan, prop *property.Physical
if canConvertPointGet && path.IsIntHandlePath && !ds.Table.Meta().PKIsHandle && len(ds.PartitionNames) != 1 {
canConvertPointGet = false
}
if canConvertPointGet {
if path != nil && path.Index != nil && path.Index.Global {
// Don't convert to point get during ddl
// TODO: Revisit truncate partition and global index
if len(ds.TableInfo.GetPartitionInfo().DroppingDefinitions) > 0 ||
len(ds.TableInfo.GetPartitionInfo().AddingDefinitions) > 0 {
canConvertPointGet = false
}
}
}
}
if canConvertPointGet {
allRangeIsPoint := true
Expand Down Expand Up @@ -2278,26 +2268,35 @@ func (is *PhysicalIndexScan) addSelectionConditionForGlobalIndex(p *logicalop.Da
needNot := false
pInfo := p.TableInfo.GetPartitionInfo()
if len(idxArr) == 1 && idxArr[0] == FullRange {
// Only filter adding and dropping partitions.
if len(pInfo.AddingDefinitions) == 0 && len(pInfo.DroppingDefinitions) == 0 {
return conditions, nil
}
// Filter away partitions that may exists in Global Index,
// but should not be seen.
needNot = true
for _, p := range pInfo.AddingDefinitions {
args = append(args, expression.NewInt64Const(p.ID))
}
for _, p := range pInfo.DroppingDefinitions {
args = append(args, expression.NewInt64Const(p.ID))
for _, id := range pInfo.IDsInDDLToIgnore() {
args = append(args, expression.NewInt64Const(id))
}
} else if len(idxArr) == 0 {
// add an invalid pid as param for `IN` function
// TODO: Can we change to Table Dual somehow?
// Add an invalid pid as param for `IN` function
args = append(args, expression.NewInt64Const(-1))
} else {
// `PartitionPruning`` func does not return adding and dropping partitions
// TODO: When PartitionPruning is guaranteed to not
// return old/blocked partition ids then ignoreMap can be removed.
ignoreMap := make(map[int64]struct{})
for _, id := range pInfo.IDsInDDLToIgnore() {
ignoreMap[id] = struct{}{}
}
for _, idx := range idxArr {
args = append(args, expression.NewInt64Const(pInfo.Definitions[idx].ID))
id := pInfo.Definitions[idx].ID
_, ok := ignoreMap[id]
if !ok {
args = append(args, expression.NewInt64Const(id))
}
intest.Assert(!ok, "PartitionPruning returns partitions which should be ignored!")
}
}
if len(args) == 1 {
return conditions, nil
}
condition, err := expression.NewFunction(p.SCtx().GetExprCtx(), ast.In, types.NewFieldType(mysql.TypeLonglong), args...)
if err != nil {
return nil, err
Expand Down

0 comments on commit 2511e92

Please sign in to comment.