Skip to content

Commit

Permalink
Revert "This is an automated cherry-pick of pingcap#57114"
Browse files Browse the repository at this point in the history
This reverts commit 8e8da6d.
  • Loading branch information
mjonss committed Dec 12, 2024
1 parent 8e8da6d commit 5c31699
Show file tree
Hide file tree
Showing 10 changed files with 67 additions and 4,681 deletions.
30 changes: 0 additions & 30 deletions pkg/ddl/delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,36 +332,6 @@ func insertJobIntoDeleteRangeTable(ctx context.Context, sctx sessionctx.Context,
return errors.Trace(err)
}
}
<<<<<<< HEAD
=======
// always delete the table range, even when it's a partitioned table where
// it may contain global index regions.
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, []int64{tableID}, ea, "truncate table: table ID"))
case model.ActionDropTablePartition:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "drop partition: physical table ID(s)"))
case model.ActionReorganizePartition, model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
// Delete dropped partitions, as well as replaced global indexes.
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return errors.Trace(err)
}
for _, idx := range args.OldGlobalIndexes {
if err := doBatchDeleteIndiceRange(ctx, wrapper, job.ID, idx.TableID, []int64{idx.IndexID}, ea, "reorganize partition, replaced global indexes"); err != nil {
return errors.Trace(err)
}
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPhysicalTblIDs, ea, "reorganize partition: physical table ID(s)"))
case model.ActionTruncateTablePartition:
args, err := model.GetTruncateTableArgs(job)
if err != nil {
return errors.Trace(err)
}
return errors.Trace(doBatchDeleteTablesRange(ctx, wrapper, job.ID, args.OldPartitionIDs, ea, "truncate partition: physical table ID(s)"))
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))
// ActionAddIndex, ActionAddPrimaryKey needs do it, because it needs to be rolled back when it's canceled.
case model.ActionAddIndex, model.ActionAddPrimaryKey:
allIndexIDs := make([]int64, 1)
Expand Down
386 changes: 42 additions & 344 deletions pkg/ddl/partition.go

Large diffs are not rendered by default.

16 changes: 1 addition & 15 deletions pkg/ddl/sanity_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (d *ddl) checkDeleteRangeCnt(job *model.Job) {
panic(err)
}
if actualCnt != expectedCnt {
panic(fmt.Sprintf("expect delete range count %d, actual count %d for job type '%s'", expectedCnt, actualCnt, job.Type.String()))
panic(fmt.Sprintf("expect delete range count %d, actual count %d", expectedCnt, actualCnt))
}
}

Expand Down Expand Up @@ -106,21 +106,7 @@ func expectedDeleteRangeCnt(ctx delRangeCntCtx, job *model.Job) (int, error) {
if err := job.DecodeArgs(&physicalTableIDs); err != nil {
return 0, errors.Trace(err)
}
<<<<<<< HEAD
return len(physicalTableIDs), nil
=======
if job.Type == model.ActionTruncateTable {
return len(args.OldPartitionIDs) + 1, nil
}
return len(args.OldPartitionIDs), nil
case model.ActionDropTablePartition, model.ActionReorganizePartition,
model.ActionRemovePartitioning, model.ActionAlterTablePartitioning:
args, err := model.GetFinishedTablePartitionArgs(job)
if err != nil {
return 0, errors.Trace(err)
}
return len(args.OldPhysicalTblIDs) + len(args.OldGlobalIndexes), nil
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))
case model.ActionAddIndex, model.ActionAddPrimaryKey:
indexID := make([]int64, 1)
ifExists := make([]bool, 1)
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/tests/partition/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ go_test(
"//pkg/sessionctx",
"//pkg/sessionctx/variable",
"//pkg/sessiontxn",
"//pkg/store/gcworker",
"//pkg/store/mockstore",
"//pkg/table",
"//pkg/table/tables",
Expand Down
112 changes: 21 additions & 91 deletions pkg/ddl/tests/partition/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1401,41 +1401,16 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
partition p2 values less than (20),
partition p3 values less than (30)
);`)
<<<<<<< HEAD
tk.MustExec("alter table test_global add unique index idx_b (b);")
tk.MustExec("insert into test_global values (1, 1, 1), (8, 8, 8), (11, 11, 11), (12, 12, 12);")

hook := &callback.TestDDLCallback{Do: dom}
hook.OnJobRunBeforeExported = func(job *model.Job) {
=======
tk.MustExec("alter table test_global add unique index idx_b (b) global")
tk.MustExec("insert into test_global values (1, 1, 1), (2, 2, 2), (11, 11, 11), (12, 12, 12)")

doneMap := make(map[model.SchemaState]struct{})
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/onJobRunBefore", func(job *model.Job) {
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))
assert.Equal(t, model.ActionDropTablePartition, job.Type)
if _, ok := doneMap[job.SchemaState]; ok {
return
}
doneMap[job.SchemaState] = struct{}{}
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
switch job.SchemaState {
case model.StatePublic:
tk2.MustExec("insert into test_global values (3, 3, 3)")
tk2.MustExec("insert into test_global values (13, 13, 13)")
case model.StateWriteOnly:
tk2.MustContainErrMsg("insert into test_global values (4, 4, 4)", "[table:1526]Table has no partition for value matching a partition being dropped, 'p1'")
tk2.MustExec("insert into test_global values (14, 14, 14)")
case model.StateDeleteOnly:
tk2.MustExec("insert into test_global values (5, 5, 5)")
tk2.MustExec("insert into test_global values (15, 15, 15)")
case model.StateDeleteReorganization:
tk2.MustExec("insert into test_global values (6, 6, 6)")
tk2.MustExec("insert into test_global values (16, 16, 16)")
default:
require.Fail(t, "invalid schema state '%s'", job.SchemaState.String())
if job.SchemaState == model.StateDeleteOnly {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("insert into test_global values (9, 9, 9)")
}
}
dom.DDL().SetHook(hook)
Expand All @@ -1445,7 +1420,7 @@ func TestGlobalIndexInsertInDropPartition(t *testing.T) {
tk1.MustExec("alter table test_global drop partition p1")

tk.MustExec("analyze table test_global")
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("5 5 5", "6 6 6", "11 11 11", "12 12 12", "13 13 13", "14 14 14", "15 15 15", "16 16 16"))
tk.MustQuery("select * from test_global use index(idx_b) order by a").Check(testkit.Rows("9 9 9", "11 11 11", "12 12 12"))
}

func TestUpdateGlobalIndex(t *testing.T) {
Expand Down Expand Up @@ -3633,10 +3608,10 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`COMMIT`)
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "29 28 9"))
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "30 29 9"))
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "23 22 6", "27 26 8", "31 30 10"))
"19 18 4", "23 22 6", "27 26 8", "32 31 10"))

waitFor(4, "t", "write reorganization")
tk3.MustExec(`BEGIN`)
Expand All @@ -3646,73 +3621,28 @@ func TestRemovePartitioningAutoIDs(t *testing.T) {
tk3.MustExec(`insert into t values (null, 23)`)
tk2.MustExec(`COMMIT`)

<<<<<<< HEAD
/*
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
=======
waitFor(4, "t", "delete reorganization")
tk2.MustExec(`BEGIN`)
tk2.MustExec(`insert into t values (null, 24)`)
>>>>>>> b6025b97877 (*: Reorg partition fix delete ranges and handling non-clustered tables with concurrent DML (#57114))
tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
tk3.MustExec(`insert into t values (null, 25)`)
tk2.MustExec(`insert into t values (null, 26)`)
*/
tk3.MustExec(`COMMIT`)
tk2.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"35 34 22",
"39 38 24",
"43 42 26"))
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"41 40 25"))

waitFor(4, "t", "public")
tk2.MustExec(`commit`)
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))

//waitFor(4, "t", "public")
//tk2.MustExec(`commit`)
// TODO: Investigate and fix, but it is also related to https://github.com/pingcap/tidb/issues/46904
require.ErrorContains(t, <-alterChan, "[kv:1062]Duplicate entry '31' for key 't.PRIMARY'")
tk3.MustQuery(`select _tidb_rowid, a, b from t`).Sort().Check(testkit.Rows(
"27 26 8",
"30012 12 12",
"30013 18 4",
"30014 24 7",
"30015 16 18",
"30016 22 6",
"30017 28 9",
"30018 11 11",
"30019 2 2",
"30020 20 5",
"31 30 10",
"33 32 21",
"35 34 22",
"37 36 23",
"39 38 24",
"41 40 25",
"43 42 26"))
require.NoError(t, <-alterChan)
"13 11 11", "14 2 2", "15 12 12", "17 16 18",
"19 18 4", "21 20 5", "23 22 6", "25 24 7", "27 26 8", "30 29 9",
"32 31 10", "35 34 21", "38 37 22", "41 40 23"))
}

func TestAlterLastIntervalPartition(t *testing.T) {
Expand Down
Loading

0 comments on commit 5c31699

Please sign in to comment.