Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Allow Point_Get during DDL with Global Index #56382

Merged
merged 18 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in

// move the adding definition into tableInfo.
updateAddingPartitionInfo(partInfo, tblInfo)
tblInfo.Partition.DDLState = model.StateReplicaOnly
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -223,6 +225,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in

preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions)

tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -2251,9 +2255,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}

var physicalTableIDs []int64
// In order to skip maintaining the state check in partitionDefinition, TiDB use droppingDefinition instead of state field.
// So here using `job.SchemaState` to judge what the stage of this job is.
originalState := job.SchemaState
switch job.SchemaState {
case model.StatePublic:
// Here we mark the partitions to be dropped, so they are not read or written
Expand All @@ -2267,11 +2268,11 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
originalDefs := tblInfo.Partition.Definitions
physicalTableIDs = updateDroppingPartitionInfo(tblInfo, partNames)
tblInfo.Partition.Definitions = originalDefs
tblInfo.Partition.DDLState = model.StateWriteOnly
tblInfo.Partition.DDLAction = model.ActionDropTablePartition

job.SchemaState = model.StateWriteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type

ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// Since the previous state do not use the dropping partitions,
// we can now actually remove them, allowing to write into the overlapping range
Expand Down Expand Up @@ -2315,16 +2316,16 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
return ver, err
}

tblInfo.Partition.DDLState = model.StateDeleteOnly
job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintain the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
tblInfo.Partition.DDLState = model.StateDeleteReorganization
job.SchemaState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteReorganization:
oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo)
physicalTableIDs = getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
Expand Down Expand Up @@ -2382,7 +2383,8 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}
droppedDefs := tblInfo.Partition.DroppingDefinitions
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
job.SchemaState = model.StateNone
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = model.ActionNone
// used by ApplyDiff in updateSchemaVersion
args.OldPhysicalTblIDs = physicalTableIDs
Expand Down Expand Up @@ -2525,14 +2527,16 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
pi.DroppingDefinitions = truncatingDefinitions
pi.NewPartitionIDs = newIDs[:]

tblInfo.Partition.DDLAction = model.ActionTruncateTablePartition
job.SchemaState = model.StateDeleteOnly
pi.DDLState = job.SchemaState
pi.DDLAction = job.Type
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
job.SchemaState = model.StateDeleteReorganization
pi.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteReorganization:
// Step2: clear global index rows.
Expand Down Expand Up @@ -2623,6 +2627,8 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
// Step4: clear DroppingDefinitions and finish job.
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.NewPartitionIDs = nil
tblInfo.Partition.DDLAction = model.ActionNone
tblInfo.Partition.DDLState = model.StateNone

preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)

Expand Down Expand Up @@ -2834,6 +2840,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
// into the table using the schema version
// before the exchange is made.
job.SchemaState = model.StateWriteOnly
pt.Partition.DDLState = job.SchemaState
pt.Partition.DDLAction = job.Type
return updateVersionAndTableInfoWithCheck(jobCtx, job, nt, true, ptInfo...)
}
// From now on, nt (the non-partitioned table) has
Expand Down Expand Up @@ -2916,6 +2924,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
originalPartitionDef := partDef.Clone()
originalNt := nt.Clone()
partDef.ID, nt.ID = nt.ID, partDef.ID
pt.Partition.DDLState = model.StateNone
pt.Partition.DDLAction = model.ActionNone

err = metaMut.UpdateTable(ptSchemaID, pt)
if err != nil {
Expand Down Expand Up @@ -3354,7 +3364,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that.
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
Expand Down Expand Up @@ -3424,8 +3434,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, job, err))
}
})
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateWriteOnly
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// Insert this state to confirm all servers can see the new partitions when reorg is running,
// so that new data will be updated in both old and new partitions when reorganizing.
Expand All @@ -3435,10 +3446,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
tblInfo.Indices[i].State = model.StateWriteReorganization
}
}
tblInfo.Partition.DDLState = model.StateWriteReorganization
job.SchemaState = model.StateWriteReorganization
tblInfo.Partition.DDLState = job.SchemaState
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateWriteReorganization
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
case model.StateWriteReorganization:
physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
tbl, err2 := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
Expand Down Expand Up @@ -3523,9 +3534,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// Now all the data copying is done, but we cannot simply remove the droppingDefinitions
// since they are a part of the normal Definitions that other nodes with
// the current schema version. So we need to double write for one more schema version
tblInfo.Partition.DDLState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateDeleteReorganization
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)

case model.StateDeleteReorganization:
// Drop the droppingDefinitions and finish the DDL
Expand All @@ -3547,6 +3558,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.AddingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
tblInfo.Partition.OriginalPartitionIDsOrder = nil

var dropIndices []*model.IndexInfo
Expand Down
6 changes: 2 additions & 4 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,10 +1433,8 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec(`begin`)
tk3.MustExec(`use test`)
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Selection")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Selection")
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Point_Get")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get")
tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
// Here it will fail with
Expand Down
22 changes: 22 additions & 0 deletions pkg/ddl/tests/partition/multi_domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,28 @@ import (
"go.uber.org/zap"
)

func TestMultiSchemaReorganizePartitionIssue56819(t *testing.T) {
createSQL := `create table t (a int primary key, b varchar(255), unique index idx_b_global (b) global) partition by range (a) (partition p1 values less than (200), partition pMax values less than (maxvalue))`
initFn := func(tkO *testkit.TestKit) {
tkO.MustExec(`insert into t values (1,1),(2,2)`)
}
alterSQL := `alter table t reorganize partition p1 into (partition p0 values less than (100), partition p1 values less than (200))`
loopFn := func(tkO, tkNO *testkit.TestKit) {
res := tkO.MustQuery(`select schema_state from information_schema.DDL_JOBS where table_name = 't' order by job_id desc limit 1`)
schemaState := res.Rows()[0][0].(string)
switch schemaState {
case model.StateDeleteOnly.String():
tkNO.MustExec(`insert into t values (4,4)`)
tkNO.MustQuery(`select * from t where b = "4"`).Sort().Check(testkit.Rows("4 4"))
tkO.MustQuery(`select * from t where b = "4"`).Sort().Check(testkit.Rows("4 4"))
}
}
postFn := func(tkO *testkit.TestKit) {
// nothing
}
runMultiSchemaTest(t, createSQL, alterSQL, initFn, postFn, loopFn)
}

func TestMultiSchemaDropRangePartition(t *testing.T) {
createSQL := `create table t (a int primary key, b varchar(255)) partition by range (a) (partition p0 values less than (100), partition p1 values less than (200))`
initFn := func(tkO *testkit.TestKit) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,15 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
tblID = pid
if !matchPartitionNames(tblID, e.partitionNames, e.tblInfo.GetPartitionInfo()) {
pi := e.tblInfo.GetPartitionInfo()
if !matchPartitionNames(tblID, e.partitionNames, pi) {
return nil
}
for _, id := range pi.IDsInDDLToIgnore() {
if id == pid {
return nil
}
}
mjonss marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
65 changes: 64 additions & 1 deletion pkg/meta/model/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,6 +1007,69 @@ func (pi *PartitionInfo) SetOriginalPartitionIDs() {
pi.OriginalPartitionIDsOrder = ids
}

// IDsInDDLToIgnore returns a list of IDs that the current
// session should not see (may be duplicate errors on insert/update though)
// For example during truncate or drop partition.
func (pi *PartitionInfo) IDsInDDLToIgnore() []int64 {
// TODO:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple PRs/enhancements that will build upon this, especially for DROP and TRUNCATE partition.

// Truncate partition:
// write only => should not see NewPartitionIDs
// delete only => should not see DroppingPartitions
// Drop partition:
// TODO: Make similar changes as in Truncate Partition:
// Add a state blocking read and write in the partitions to be dropped,
// to avoid situations like https://github.com/pingcap/tidb/issues/55888
// Add partition:
// TODO: Add tests!
// Exchange Partition:
// Currently blocked for GlobalIndex
// Reorganize Partition:
// Nothing, since it will create a new copy of the global index.
// This is due to the global index needs to have two partitions for the same index key
// TODO: Should we extend the GlobalIndex to have multiple partitions?
// Maybe from PK/_tidb_rowid + Partition ID
// to PK/_tidb_rowid + Partition ID + Valid from Schema Version,
// with support for two entries?
// Then we could avoid having two copies of the same Global Index
// just for handling a single SchemaState.
// If so, could we then replace this?
switch pi.DDLAction {
case ActionTruncateTablePartition:
switch pi.DDLState {
case StateWriteOnly:
return pi.NewPartitionIDs
case StateDeleteOnly, StateDeleteReorganization:
if len(pi.DroppingDefinitions) == 0 {
return nil
}
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, definition := range pi.DroppingDefinitions {
ids = append(ids, definition.ID)
}
return ids
}
case ActionDropTablePartition:
if len(pi.DroppingDefinitions) > 0 && pi.DDLState == StateDeleteOnly {
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, def := range pi.DroppingDefinitions {
ids = append(ids, def.ID)
}
return ids
}
case ActionAddTablePartition:
// TODO: Add tests for ADD PARTITION multi-domain with Global Index!
if len(pi.AddingDefinitions) > 0 {
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, def := range pi.AddingDefinitions {
ids = append(ids, def.ID)
}
return ids
}
// Not supporting Global Indexes: case ActionExchangeTablePartition
}
return nil
}

// PartitionState is the state of the partition.
type PartitionState struct {
ID int64 `json:"id"`
Expand All @@ -1023,7 +1086,7 @@ type PartitionDefinition struct {
Comment string `json:"comment,omitempty"`
}

// Clone clones ConstraintInfo.
// Clone clones PartitionDefinition.
func (ci *PartitionDefinition) Clone() PartitionDefinition {
nci := *ci
nci.LessThan = make([]string, len(ci.LessThan))
Expand Down
43 changes: 21 additions & 22 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1480,16 +1480,6 @@ func findBestTask4LogicalDataSource(lp base.LogicalPlan, prop *property.Physical
if canConvertPointGet && path.IsIntHandlePath && !ds.Table.Meta().PKIsHandle && len(ds.PartitionNames) != 1 {
canConvertPointGet = false
}
if canConvertPointGet {
if path != nil && path.Index != nil && path.Index.Global {
// Don't convert to point get during ddl
// TODO: Revisit truncate partition and global index
Defined2014 marked this conversation as resolved.
Show resolved Hide resolved
if len(ds.TableInfo.GetPartitionInfo().DroppingDefinitions) > 0 ||
len(ds.TableInfo.GetPartitionInfo().AddingDefinitions) > 0 {
canConvertPointGet = false
}
}
}
}
if canConvertPointGet {
allRangeIsPoint := true
Expand Down Expand Up @@ -2278,26 +2268,35 @@ func (is *PhysicalIndexScan) addSelectionConditionForGlobalIndex(p *logicalop.Da
needNot := false
pInfo := p.TableInfo.GetPartitionInfo()
if len(idxArr) == 1 && idxArr[0] == FullRange {
// Only filter adding and dropping partitions.
if len(pInfo.AddingDefinitions) == 0 && len(pInfo.DroppingDefinitions) == 0 {
return conditions, nil
}
// Filter away partitions that may exists in Global Index,
// but should not be seen.
needNot = true
for _, p := range pInfo.AddingDefinitions {
args = append(args, expression.NewInt64Const(p.ID))
}
for _, p := range pInfo.DroppingDefinitions {
args = append(args, expression.NewInt64Const(p.ID))
for _, id := range pInfo.IDsInDDLToIgnore() {
args = append(args, expression.NewInt64Const(id))
}
} else if len(idxArr) == 0 {
// add an invalid pid as param for `IN` function
// TODO: Can we change to Table Dual somehow?
// Add an invalid pid as param for `IN` function
args = append(args, expression.NewInt64Const(-1))
} else {
// `PartitionPruning`` func does not return adding and dropping partitions
// TODO: When PartitionPruning is guaranteed to not
// return old/blocked partition ids then ignoreMap can be removed.
mjonss marked this conversation as resolved.
Show resolved Hide resolved
ignoreMap := make(map[int64]struct{})
for _, id := range pInfo.IDsInDDLToIgnore() {
ignoreMap[id] = struct{}{}
}
for _, idx := range idxArr {
args = append(args, expression.NewInt64Const(pInfo.Definitions[idx].ID))
id := pInfo.Definitions[idx].ID
_, ok := ignoreMap[id]
if !ok {
args = append(args, expression.NewInt64Const(id))
}
intest.Assert(!ok, "PartitionPruning returns partitions which should be ignored!")
}
}
if len(args) == 1 {
mjonss marked this conversation as resolved.
Show resolved Hide resolved
return conditions, nil
}
condition, err := expression.NewFunction(p.SCtx().GetExprCtx(), ast.In, types.NewFieldType(mysql.TypeLonglong), args...)
if err != nil {
return nil, err
Expand Down