Skip to content

Commit

Permalink
*: Reorg partition fix delete ranges and handling non-clustered table…
Browse files Browse the repository at this point in the history
…s with concurrent DML (#57114)

ref #45133, close #56822, close #57510
  • Loading branch information
mjonss authored Nov 29, 2024
1 parent cc59ab0 commit b6025b9
Show file tree
Hide file tree
Showing 10 changed files with 654 additions and 252 deletions.
17 changes: 14 additions & 3 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,24 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, wrapper DelRangeExecWrap
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition, model.ActionReorganizePartition,
model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
case model.ActionDropTablePartition:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize/drop partition: physical table ID(s)"))
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "drop partition: physical table ID(s)"))
case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
// Delete dropped partitions, as well as replaced global indexes.
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
for _, idx := range args.OldGlobalIndexes {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, idx.TableID, []int64{idx.IndexID}, ea, "reorganize partition, replaced global indexes"); err != nil {
return errors.Trace(err)
}
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize partition: physical table ID(s)"))
case model.ActionTruncateTablePartition:
args, err := model.GetTruncateTableArgs(job)
if err != nil {
Expand Down
253 changes: 174 additions & 79 deletions pkg/ddl/partition.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (e *executor) checkDeleteRangeCnt(job *model.Job) {
panic(err)
}
if actualCnt != expectedCnt {
panic(fmt.Sprintf("expect delete range count %d, actual count %d", expectedCnt, actualCnt))
panic(fmt.Sprintf("expect delete range count %d, actual count %d for job type '%s'", expectedCnt, actualCnt, job.Type.String()))
}
}

Expand Down Expand Up @@ -110,7 +110,7 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
if err != nil {
return 0, errors.Trace(err)
}
return len(args.OldPhysicalTblIDs), nil
return len(args.OldPhysicalTblIDs) + len(args.OldGlobalIndexes), nil
case model.ActionAddIndex, model.ActionAddPrimaryKey:
args, err := model.GetFinishedModifyIndexArgs(job)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_test(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/store/gcworker",
"//pkg/store/mockstore",
"//pkg/table",
"//pkg/table/tables",
Expand Down
110 changes: 82 additions & 28 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1336,14 +1336,32 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
partition p3 values less than (30)
);`)
tk.MustExec("alter table test_global add unique index idx_b (b) global")
tk.MustExec("insert into test_global values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);")
tk.MustExec("insert into test_global values (1, 1, 1), (2, 2, 2), (11, 11, 11), (12, 12, 12)")

doneMap := make(map[model.SchemaState]struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunBefore", func(job *model.Job) {
assert.Equal(t, model.ActionDropTablePartition, job.Type)
if job.SchemaState == model.StateDeleteOnly {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into test_global values (9, 9, 9)")
if _, ok := doneMap[job.SchemaState]; ok {
return
}
doneMap[job.SchemaState] = struct{}{}
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
switch job.SchemaState {
case model.StatePublic:
tk2.MustExec("insert into test_global values (3, 3, 3)")
tk2.MustExec("insert into test_global values (13, 13, 13)")
case model.StateWriteOnly:
tk2.MustContainErrMsg("insert into test_global values (4, 4, 4)", "[table:1526]Table has no partition for value matching a partition being dropped, 'p1'")
tk2.MustExec("insert into test_global values (14, 14, 14)")
case model.StateDeleteOnly:
tk2.MustExec("insert into test_global values (5, 5, 5)")
tk2.MustExec("insert into test_global values (15, 15, 15)")
case model.StateDeleteReorganization:
tk2.MustExec("insert into test_global values (6, 6, 6)")
tk2.MustExec("insert into test_global values (16, 16, 16)")
default:
require.Fail(t, "invalid schema state '%s'", job.SchemaState.String())
}
})

Expand All @@ -1352,7 +1370,7 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
tk1.MustExec("alter table test_global drop partition p1")

tk.MustExec("analyze table test_global")
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("9 9 9", "11 11 11", "12 12 12"))
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("5 5 5", "6 6 6", "11 11 11", "12 12 12", "13 13 13", "14 14 14", "15 15 15", "16 16 16"))
}

func TestGlobalIndexUpdateInDropPartition(t *testing.T) {
Expand Down Expand Up @@ -3168,10 +3186,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`COMMIT`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9"))
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9"))
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "23 22 6", "27 26 8", "32 31 10"))
"19 18 4", "23 22 6", "27 26 8", "31 30 10"))

waitFor(4, "t", "write reorganization")
tk3.MustExec(`BEGIN`)
Expand All @@ -3181,30 +3199,66 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`insert into t values (null, 23)`)
tk2.MustExec(`COMMIT`)

/*
// Currently there is an duplicate entry issue, so it will rollback in WriteReorganization
// instead of continuing.
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)

tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
*/
tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
tk3.MustExec(`COMMIT`)
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"35 34 22",
"39 38 24",
"43 42 26"))
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))

//waitFor(4, "t", "public")
//tk2.MustExec(`commit`)
// TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'")
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"41 40 25"))

waitFor(4, "t", "public")
tk2.MustExec(`commit`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"39 38 24",
"41 40 25",
"43 42 26"))
require.NoError(t, <-alterChan)
}

func TestAlterLastIntervalPartition(t *testing.T) {
Expand Down
Loading

0 comments on commit b6025b9

Please sign in to comment.