Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Allow Point_Get during DDL with Global Index #56382

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
38 changes: 28 additions & 10 deletions pkg/ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in

// move the adding definition into tableInfo.
updateAddingPartitionInfo(partInfo, tblInfo)
tblInfo.Partition.DDLState = model.StateReplicaOnly
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -222,6 +224,8 @@ func (w *worker) onAddTablePartition(jobCtx *jobContext, job *model.Job) (ver in

preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, addingDefinitions)

tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
Expand Down Expand Up @@ -2216,9 +2220,6 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}

var physicalTableIDs []int64
// In order to skip maintaining the state check in partitionDefinition, TiDB use droppingDefinition instead of state field.
// So here using `job.SchemaState` to judge what the stage of this job is.
originalState := job.SchemaState
switch job.SchemaState {
case model.StatePublic:
// If an error occurs, it returns that it cannot delete all partitions or that the partition doesn't exist.
Expand Down Expand Up @@ -2266,13 +2267,16 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
}

job.SchemaState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
job.SchemaState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, originalState != job.SchemaState)
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteReorganization:
oldTblInfo := getTableInfoWithDroppingPartitions(tblInfo)
physicalTableIDs = getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
Expand Down Expand Up @@ -2337,6 +2341,7 @@ func (w *worker) onDropTablePartition(jobCtx *jobContext, job *model.Job) (ver i
return ver, errors.Trace(err)
}
job.SchemaState = model.StateNone
tblInfo.Partition.DDLState = job.SchemaState
job.FinishTableJob(model.JobStateDone, model.StateNone, ver, tblInfo)
dropPartitionEvent := notifier.NewDropPartitionEvent(
tblInfo,
Expand Down Expand Up @@ -2465,12 +2470,15 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
pi.NewPartitionIDs = newIDs[:]

job.SchemaState = model.StateDeleteOnly
pi.DDLState = job.SchemaState
pi.DDLAction = job.Type
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteOnly:
// This state is not a real 'DeleteOnly' state, because tidb does not maintaining the state check in partitionDefinition.
// Insert this state to confirm all servers can not see the old partitions when reorg is running,
// so that no new data will be inserted into old partitions when reorganizing.
job.SchemaState = model.StateDeleteReorganization
pi.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateDeleteReorganization:
// Step2: clear global index rows.
Expand Down Expand Up @@ -2561,6 +2569,8 @@ func (w *worker) onTruncateTablePartition(jobCtx *jobContext, job *model.Job) (i
// Step4: clear DroppingDefinitions and finish job.
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.NewPartitionIDs = nil
tblInfo.Partition.DDLAction = model.ActionNone
tblInfo.Partition.DDLState = model.StateNone

preSplitAndScatter(w.sess.Context, jobCtx.store, tblInfo, newPartitions)

Expand Down Expand Up @@ -2768,6 +2778,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
// into the table using the schema version
// before the exchange is made.
job.SchemaState = model.StateWriteOnly
pt.Partition.DDLState = job.SchemaState
pt.Partition.DDLAction = job.Type
return updateVersionAndTableInfoWithCheck(jobCtx, job, nt, true, ptInfo...)
}
// From now on, nt (the non-partitioned table) has
Expand Down Expand Up @@ -2842,6 +2854,8 @@ func (w *worker) onExchangeTablePartition(jobCtx *jobContext, job *model.Job) (v
originalPartitionDef := partDef.Clone()
originalNt := nt.Clone()
partDef.ID, nt.ID = nt.ID, partDef.ID
pt.Partition.DDLState = model.StateNone
pt.Partition.DDLAction = model.ActionNone

err = metaMut.UpdateTable(ptSchemaID, pt)
if err != nil {
Expand Down Expand Up @@ -3276,6 +3290,8 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// Assume we cannot have more than MaxUint64 rows, set the progress to 1/10 of that.
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.1 / float64(math.MaxUint64))
job.SchemaState = model.StateDeleteOnly
tblInfo.Partition.DDLState = job.SchemaState
tblInfo.Partition.DDLAction = job.Type
tblInfo.Partition.DDLState = model.StateDeleteOnly
ver, err = updateVersionAndTableInfoWithCheck(jobCtx, job, tblInfo, true)
if err != nil {
Expand Down Expand Up @@ -3345,8 +3361,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
failpoint.Return(rollbackReorganizePartitionWithErr(jobCtx, job, err))
}
})
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateWriteOnly
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
case model.StateWriteOnly:
// Insert this state to confirm all servers can see the new partitions when reorg is running,
// so that new data will be updated in both old and new partitions when reorganizing.
Expand All @@ -3356,10 +3373,10 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
tblInfo.Indices[i].State = model.StateWriteReorganization
}
}
tblInfo.Partition.DDLState = model.StateWriteReorganization
job.SchemaState = model.StateWriteReorganization
tblInfo.Partition.DDLState = job.SchemaState
metrics.GetBackfillProgressByLabel(metrics.LblReorgPartition, job.SchemaName, tblInfo.Name.String()).Set(0.3 / float64(math.MaxUint64))
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateWriteReorganization
case model.StateWriteReorganization:
physicalTableIDs := getPartitionIDsFromDefinitions(tblInfo.Partition.DroppingDefinitions)
tbl, err2 := getTable(jobCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
Expand Down Expand Up @@ -3444,9 +3461,9 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
// Now all the data copying is done, but we cannot simply remove the droppingDefinitions
// since they are a part of the normal Definitions that other nodes with
// the current schema version. So we need to double write for one more schema version
tblInfo.Partition.DDLState = model.StateDeleteReorganization
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)
job.SchemaState = model.StateDeleteReorganization
tblInfo.Partition.DDLState = job.SchemaState
ver, err = updateVersionAndTableInfo(jobCtx, job, tblInfo, true)

case model.StateDeleteReorganization:
// Drop the droppingDefinitions and finish the DDL
Expand All @@ -3468,6 +3485,7 @@ func (w *worker) onReorganizePartition(jobCtx *jobContext, job *model.Job) (ver
tblInfo.Partition.DroppingDefinitions = nil
tblInfo.Partition.AddingDefinitions = nil
tblInfo.Partition.DDLState = model.StateNone
tblInfo.Partition.DDLAction = model.ActionNone
tblInfo.Partition.OriginalPartitionIDsOrder = nil

var dropIndices []*model.IndexInfo
Expand Down
6 changes: 2 additions & 4 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1433,10 +1433,8 @@ func TestTruncatePartitionWithGlobalIndex(t *testing.T) {
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec(`begin`)
tk3.MustExec(`use test`)
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Selection")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("IndexRangeScan")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Selection")
tk3.MustQuery(`explain format='brief' select b from test_global use index(idx_b) where b = 15`).CheckContain("Point_Get")
tk3.MustQuery(`explain format='brief' select c from test_global use index(idx_c) where c = 15`).CheckContain("Point_Get")
tk3.MustQuery(`select b from test_global use index(idx_b) where b = 15`).Check(testkit.Rows())
tk3.MustQuery(`select c from test_global use index(idx_c) where c = 15`).Check(testkit.Rows())
// Here it will fail with
Expand Down
8 changes: 7 additions & 1 deletion pkg/executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,9 +371,15 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
return err
}
tblID = pid
if !matchPartitionNames(tblID, e.partitionNames, e.tblInfo.GetPartitionInfo()) {
pi := e.tblInfo.GetPartitionInfo()
if !matchPartitionNames(tblID, e.partitionNames, pi) {
return nil
}
for _, id := range pi.IDsInDDLToIgnore() {
if id == pid {
return nil
}
}
Comment on lines 373 to +382
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems better to converge all partition logic together into the PartitionProcessor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of this is done in #56082, I created #56449 to make sure it is all handled.

}
}
}
Expand Down
70 changes: 69 additions & 1 deletion pkg/meta/model/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,9 @@ type PartitionInfo struct {
DDLColumns []model.CIStr `json:"ddl_columns"`
// For ActionAlterTablePartitioning, UPDATE INDEXES
DDLUpdateIndexes []UpdateIndexInfo `json:"ddl_update_indexes"`
// To know which DDL is ongoing. Together with DDLState one can know
// how to handle Global Index visible entries
DDLAction ActionType `json:"ddl_action"`
}

// Clone clones itself.
Expand Down Expand Up @@ -852,6 +855,8 @@ func (pi *PartitionInfo) ClearReorgIntermediateInfo() {
pi.DDLExpr = ""
pi.DDLColumns = nil
pi.NewTableID = 0
pi.DDLState = StateNone
pi.DDLAction = ActionNone
}

// FindPartitionDefinitionByName finds PartitionDefinition by name.
Expand Down Expand Up @@ -887,6 +892,69 @@ func (pi *PartitionInfo) SetOriginalPartitionIDs() {
pi.OriginalPartitionIDsOrder = ids
}

// IDsInDDLToIgnore returns a list of IDs that the current
// session should not see (may be duplicate errors on insert/update though)
// For example during truncate or drop partition.
func (pi *PartitionInfo) IDsInDDLToIgnore() []int64 {
// TODO:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are multiple PRs/enhancements that will build upon this, especially for DROP and TRUNCATE partition.

// Truncate partition:
// write only => should not see NewPartitionIDs
// delete only => should not see DroppingPartitions
// Drop partition:
// TODO: Make similar changes as in Truncate Partition:
// Add a state blocking read and write in the partitions to be dropped,
// to avoid situations like https://github.com/pingcap/tidb/issues/55888
// Add partition:
// TODO: Add tests!
// Exchange Partition:
// Currently blocked for GlobalIndex
// Reorganize Partition:
// Nothing, since it will create a new copy of the global index.
// This is due to the global index needs to have two partitions for the same index key
// TODO: Should we extend the GlobalIndex to have multiple partitions?
// Maybe from PK/_tidb_rowid + Partition ID
// to PK/_tidb_rowid + Partition ID + Valid from Schema Version,
// with support for two entries?
// Then we could avoid having two copies of the same Global Index
// just for handling a single SchemaState.
// If so, could we then replace this?
switch pi.DDLAction {
case ActionTruncateTablePartition:
switch pi.DDLState {
case StateWriteOnly:
return pi.NewPartitionIDs
case StateDeleteOnly, StateDeleteReorganization:
if len(pi.DroppingDefinitions) == 0 {
return nil
}
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, definition := range pi.DroppingDefinitions {
ids = append(ids, definition.ID)
}
return ids
}
case ActionDropTablePartition:
if len(pi.DroppingDefinitions) > 0 && pi.DDLState == StateDeleteOnly {
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, def := range pi.DroppingDefinitions {
ids = append(ids, def.ID)
}
return ids
}
case ActionAddTablePartition:
// TODO: Add tests for ADD PARTITION multi-domain with Global Index!
if len(pi.AddingDefinitions) > 0 {
ids := make([]int64, 0, len(pi.DroppingDefinitions))
for _, def := range pi.AddingDefinitions {
ids = append(ids, def.ID)
}
return ids
}
// Not supporting Global Indexes: case ActionExchangeTablePartition
}
return nil
}

// PartitionState is the state of the partition.
type PartitionState struct {
ID int64 `json:"id"`
Expand All @@ -903,7 +971,7 @@ type PartitionDefinition struct {
Comment string `json:"comment,omitempty"`
}

// Clone clones ConstraintInfo.
// Clone clones PartitionDefinition.
func (ci *PartitionDefinition) Clone() PartitionDefinition {
nci := *ci
nci.LessThan = make([]string, len(ci.LessThan))
Expand Down
44 changes: 22 additions & 22 deletions pkg/planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/collate"
h "github.com/pingcap/tidb/pkg/util/hint"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/ranger"
"github.com/pingcap/tidb/pkg/util/tracing"
Expand Down Expand Up @@ -1465,16 +1466,6 @@ func findBestTask4LogicalDataSource(lp base.LogicalPlan, prop *property.Physical
if canConvertPointGet && path.IsIntHandlePath && !ds.Table.Meta().PKIsHandle && len(ds.PartitionNames) != 1 {
canConvertPointGet = false
}
if canConvertPointGet {
if path != nil && path.Index != nil && path.Index.Global {
// Don't convert to point get during ddl
// TODO: Revisit truncate partition and global index
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why point get didn't work before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it may have used partitions that should not have been seen, when it comes to global index, since the global index is unique it can be used for Point Get, and during DDL there might be old entries in the Global Index pointing to dropped partitions or new entries pointing to partitions being added or replacing old ones that are not yet seen by all sessions. Meaning a full table scan may see different data from a Global Index read.

if len(ds.TableInfo.GetPartitionInfo().DroppingDefinitions) > 0 ||
len(ds.TableInfo.GetPartitionInfo().AddingDefinitions) > 0 {
canConvertPointGet = false
}
}
}
}
if canConvertPointGet {
allRangeIsPoint := true
Expand Down Expand Up @@ -2263,26 +2254,35 @@ func (is *PhysicalIndexScan) addSelectionConditionForGlobalIndex(p *logicalop.Da
needNot := false
pInfo := p.TableInfo.GetPartitionInfo()
if len(idxArr) == 1 && idxArr[0] == FullRange {
// Only filter adding and dropping partitions.
if len(pInfo.AddingDefinitions) == 0 && len(pInfo.DroppingDefinitions) == 0 {
return conditions, nil
}
// Filter away partitions that may exists in Global Index,
// but should not be seen.
needNot = true
for _, p := range pInfo.AddingDefinitions {
args = append(args, expression.NewInt64Const(p.ID))
}
for _, p := range pInfo.DroppingDefinitions {
args = append(args, expression.NewInt64Const(p.ID))
for _, id := range pInfo.IDsInDDLToIgnore() {
args = append(args, expression.NewInt64Const(id))
}
} else if len(idxArr) == 0 {
// add an invalid pid as param for `IN` function
// TODO: Can we change to Table Dual somehow?
// Add an invalid pid as param for `IN` function
args = append(args, expression.NewInt64Const(-1))
} else {
// `PartitionPruning`` func does not return adding and dropping partitions
// TODO: When PartitionPruning is guaranteed to not
// return old/blocked partition ids then ignoreMap can be removed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it will happen? If the answer is yes, please add a related test case for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test, I think there may be possible cases, until DROP/TRUNCATE/ADD PARTITION are fully updated for Global Index. Like #55831 and #56082.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think idxArr will not contained any paritition in AddingDefinitions and DroppingDefinitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are probably right, but I prefer to have all partitioning DDLs checked and tested for Global Index consistency and proper visibility for each state combination. So until then I prefer to have this check here. Or do you prefer to add an intest-assert in PartitionPruning instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add an intest-assert is better for me.

ignoreMap := make(map[int64]struct{})
for _, id := range pInfo.IDsInDDLToIgnore() {
ignoreMap[id] = struct{}{}
}
for _, idx := range idxArr {
args = append(args, expression.NewInt64Const(pInfo.Definitions[idx].ID))
id := pInfo.Definitions[idx].ID
if _, ok := ignoreMap[id]; !ok {
args = append(args, expression.NewInt64Const(id))
} else if intest.InTest {
panic("PartitionPruning returns partitions which should be ignored!")
}
}
}
if len(args) == 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it only happens in the first if-branch, maybe we could move into it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one truncates all partitions or partition pruning don't find any matching partition, it could also happen in last if-branch.

return conditions, nil
}
condition, err := expression.NewFunction(p.SCtx().GetExprCtx(), ast.In, types.NewFieldType(mysql.TypeLonglong), args...)
if err != nil {
return nil, err
Expand Down