Skip to content

Commit

Permalink
Online DDL: better support for range partitioning (#15698)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored May 1, 2024
1 parent c154771 commit 1c4cd97
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 187 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func testRevert(t *testing.T) {
checkPartitionedTableCountRows(t, 6)
})
t.Run("partitions: drop first partition", func(t *testing.T) {
uuid := testOnlineDDLStatementForTable(t, "alter table part_test drop partition `p1`", ddlStrategy+" --fast-range-rotation", "vtgate", "")
uuid := testOnlineDDLStatementForTable(t, "alter table part_test drop partition `p1`", ddlStrategy, "vtgate", "")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, partitionedTableName, true)
Expand All @@ -1157,7 +1157,7 @@ func testRevert(t *testing.T) {
checkPartitionedTableCountRows(t, 5)
})
t.Run("partitions: add new partition", func(t *testing.T) {
uuid := testOnlineDDLStatementForTable(t, "alter table part_test add partition (PARTITION p7 VALUES LESS THAN (70))", ddlStrategy+" --fast-range-rotation", "vtgate", "")
uuid := testOnlineDDLStatementForTable(t, "alter table part_test add partition (PARTITION p7 VALUES LESS THAN (70))", ddlStrategy, "vtgate", "")
uuids = append(uuids, uuid)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
checkTable(t, partitionedTableName, true)
Expand Down
9 changes: 2 additions & 7 deletions go/vt/schema/ddl_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,6 @@ func (setting *DDLStrategySetting) IsPreferInstantDDL() bool {
return setting.hasFlag(preferInstantDDL)
}

// IsFastRangeRotationFlag checks if strategy options include --fast-range-rotation
func (setting *DDLStrategySetting) IsFastRangeRotationFlag() bool {
return setting.hasFlag(fastRangeRotationFlag)
}

// isCutOverThresholdFlag returns true when given option denotes a `--cut-over-threshold=[...]` flag
func isCutOverThresholdFlag(opt string) (string, bool) {
submatch := cutOverThresholdFlagRegexp.FindStringSubmatch(opt)
Expand Down Expand Up @@ -324,7 +319,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
}
switch {
case isFlag(opt, declarativeFlag):
case isFlag(opt, skipTopoFlag):
case isFlag(opt, skipTopoFlag): // deprecated flag, parsed for backwards compatibility
case isFlag(opt, singletonFlag):
case isFlag(opt, singletonContextFlag):
case isFlag(opt, allowZeroInDateFlag):
Expand All @@ -333,7 +328,7 @@ func (setting *DDLStrategySetting) RuntimeOptions() []string {
case isFlag(opt, inOrderCompletionFlag):
case isFlag(opt, allowConcurrentFlag):
case isFlag(opt, preferInstantDDL):
case isFlag(opt, fastRangeRotationFlag):
case isFlag(opt, fastRangeRotationFlag): // deprecated flag, parsed for backwards compatibility
case isFlag(opt, vreplicationTestSuite):
case isFlag(opt, allowForeignKeysFlag):
case isFlag(opt, analyzeTableFlag):
Expand Down
1 change: 0 additions & 1 deletion go/vt/schema/ddl_strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ func TestParseDDLStrategy(t *testing.T) {
assert.Equal(t, ts.isPostponeLaunch, setting.IsPostponeLaunch())
assert.Equal(t, ts.isAllowConcurrent, setting.IsAllowConcurrent())
assert.Equal(t, ts.fastOverRevertible, setting.IsPreferInstantDDL())
assert.Equal(t, ts.fastRangeRotation, setting.IsFastRangeRotationFlag())
assert.Equal(t, ts.allowForeignKeys, setting.IsAllowForeignKeysFlag())
assert.Equal(t, ts.analyzeTable, setting.IsAnalyzeTableFlag())
cutOverThreshold, err := setting.CutOverThreshold()
Expand Down
68 changes: 68 additions & 0 deletions go/vt/schemadiff/analysis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schemadiff

import (
"vitess.io/vitess/go/vt/sqlparser"
)

// AlterTableRotatesRangePartition answers `true` when the given ALTER TABLE statement performs any sort
// of range partition rotation, that is applicable immediately and without moving data.
// Such would be:
// - Dropping any partition(s)
// - Adding a new partition (empty, at the end of the list)
func AlterTableRotatesRangePartition(createTable *sqlparser.CreateTable, alterTable *sqlparser.AlterTable) (bool, error) {
// Validate original table is partitioned by RANGE
if createTable.TableSpec.PartitionOption == nil {
return false, nil
}
if createTable.TableSpec.PartitionOption.Type != sqlparser.RangeType {
return false, nil
}

spec := alterTable.PartitionSpec
if spec == nil {
return false, nil
}
errorResult := func(conflictingNode sqlparser.SQLNode) error {
return &PartitionSpecNonExclusiveError{
Table: alterTable.Table.Name.String(),
PartitionSpec: spec,
ConflictingStatement: sqlparser.CanonicalString(conflictingNode),
}
}
if len(alterTable.AlterOptions) > 0 {
// This should never happen, unless someone programmatically tampered with the AlterTable AST.
return false, errorResult(alterTable.AlterOptions[0])
}
if alterTable.PartitionOption != nil {
// This should never happen, unless someone programmatically tampered with the AlterTable AST.
return false, errorResult(alterTable.PartitionOption)
}
switch spec.Action {
case sqlparser.AddAction:
if len(spec.Definitions) > 1 {
// This should never happen, unless someone programmatically tampered with the AlterTable AST.
return false, errorResult(spec.Definitions[1])
}
return true, nil
case sqlparser.DropAction:
return true, nil
default:
return false, nil
}
}
83 changes: 83 additions & 0 deletions go/vt/schemadiff/analysis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package schemadiff

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/sqlparser"
)

// AnalyzePartitionRotation analyzes a given AlterTable statement to see whether it has partition rotation
// commands, and if so, is the ALTER TABLE statement valid in MySQL. In MySQL, a single ALTER TABLE statement
// cannot apply multiple rotation commands, nor can it mix rotation commands with other types of changes.
func TestAlterTableRotatesRangePartition(t *testing.T) {
tcases := []struct {
create string
alter string
expect bool
}{
{
alter: "ALTER TABLE t ADD PARTITION (PARTITION p1 VALUES LESS THAN (10))",
expect: true,
},
{
alter: "ALTER TABLE t DROP PARTITION p1",
expect: true,
},
{
alter: "ALTER TABLE t DROP PARTITION p1, p2",
expect: true,
},
{
alter: "ALTER TABLE t TRUNCATE PARTITION p3",
},
{
alter: "ALTER TABLE t COALESCE PARTITION 3",
},
{
alter: "ALTER TABLE t partition by range (id) (partition p1 values less than (10), partition p2 values less than (20), partition p3 values less than (30))",
},
{
alter: "ALTER TABLE t ADD COLUMN c1 INT, DROP COLUMN c2",
},
}

for _, tcase := range tcases {
t.Run(tcase.alter, func(t *testing.T) {
if tcase.create == "" {
tcase.create = "CREATE TABLE t (id int PRIMARY KEY) PARTITION BY RANGE (id) (PARTITION p0 VALUES LESS THAN (10))"
}
stmt, err := sqlparser.NewTestParser().ParseStrictDDL(tcase.create)
require.NoError(t, err)
createTable, ok := stmt.(*sqlparser.CreateTable)
require.True(t, ok)

stmt, err = sqlparser.NewTestParser().ParseStrictDDL(tcase.alter)
require.NoError(t, err)
alterTable, ok := stmt.(*sqlparser.AlterTable)
require.True(t, ok)

result, err := AlterTableRotatesRangePartition(createTable, alterTable)
require.NoError(t, err)
assert.Equal(t, tcase.expect, result)
})
}
}
19 changes: 19 additions & 0 deletions go/vt/schemadiff/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/sqlparser"
)

var (
Expand Down Expand Up @@ -468,3 +469,21 @@ func (e *SubsequentDiffRejectedError) Error() string {
}
return b.String()
}

// PartitionSpecNonExclusiveError is returned when a partition spec change is found alongside other changes.
// for example, in MySQL it is invalid to both DROP PARTITION (a partition spec change) and ADD COLUMN
// in the same ALTER TABLE statement. In fact, even two partition spec changes in the same ALTER TABLE
// statement are not allowed.
// This error should never be encountered in normal circumstances, because:
// - `sqlparser` should not allow such statements to be parsed.
// - schemadiff's `Diff()` function will never generate a single `ALTER TABLE` statement with such multiple changes.
// The error is used for integrity checks only, and should be considered a bug if encountered.
type PartitionSpecNonExclusiveError struct {
Table string
PartitionSpec *sqlparser.PartitionSpec
ConflictingStatement string
}

func (e *PartitionSpecNonExclusiveError) Error() string {
return fmt.Sprintf("ALTER TABLE on %s, may only have a single partition spec change, and other changes are not allowed. Found spec: %s; and change: %s", sqlescape.EscapeID(e.Table), sqlparser.CanonicalString(e.PartitionSpec), e.ConflictingStatement)
}
59 changes: 36 additions & 23 deletions go/vt/schemadiff/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package schemadiff
import (
"fmt"
"math"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -1203,52 +1204,64 @@ func (c *CreateTableEntity) isRangePartitionsRotation(
if t1Partitions.Type != sqlparser.RangeType {
return false, nil, nil
}
definitions1 := t1Partitions.Definitions
definitions1 := slices.Clone(t1Partitions.Definitions)
definitions2 := t2Partitions.Definitions
// there has to be a non-empty shared list, therefore both definitions must be non-empty:
if len(definitions1) == 0 {
return false, nil, nil
}
if len(definitions2) == 0 {
return false, nil, nil
}
definitions2map := make(map[string]*sqlparser.PartitionDefinition, len(definitions2))
for _, definition := range definitions2 {
definitions2map[sqlparser.CanonicalString(definition)] = definition
}
// Find dropped partitions:
var droppedPartitions1 []*sqlparser.PartitionDefinition
// It's OK for prefix of t1 partitions to be nonexistent in t2 (as they may have been rotated away in t2)
for len(definitions1) > 0 && !sqlparser.Equals.RefOfPartitionDefinition(definitions1[0], definitions2[0]) {
droppedPartitions1 = append(droppedPartitions1, definitions1[0])
definitions1 = definitions1[1:]
for i := len(definitions1) - 1; i >= 0; i-- {
definition := definitions1[i]
if _, ok := definitions2map[sqlparser.CanonicalString(definition)]; !ok {
// In range partitioning, it's allowed to drop any partition, whether it's the first, somewhere in the middle, or last.
droppedPartitions1 = append(droppedPartitions1, definition)
// We remove the definition from the list, so that we can then compare the remaining definitions
definitions1 = append(definitions1[:i], definitions1[i+1:]...)
}
}
slices.Reverse(droppedPartitions1)
if len(definitions1) == 0 {
// We've exhausted definition1 trying to find a shared partition with definitions2. Nothing found.
// so there is no shared sequence between the two tables.
// Nothing shared between the two partition lists.
return false, nil, nil
}
// In range partitioning, it's only allowed to ADD one partition at the end of the range.
// We allow multiple here, and the diff mechanism will later split them to subsequent diffs.

// Let's now validate that any added partitions in t2Partitions are strictly a suffix of t1Partitions
if len(definitions1) > len(definitions2) {
return false, nil, nil
}
// To save computation, and because we've already shown that sqlparser.EqualsRefOfPartitionDefinition(definitions1[0], definitions2[0]), nil,
// we can skip one element
definitions1 = definitions1[1:]
definitions2 = definitions2[1:]
// Now let's ensure that whatever is remaining in definitions1 is an exact match for a prefix of definitions2
// It's ok if we end up with leftover elements in definition2
for len(definitions1) > 0 {
if !sqlparser.Equals.RefOfPartitionDefinition(definitions1[0], definitions2[0]) {
for i := range definitions1 {
if !sqlparser.Equals.RefOfPartitionDefinition(definitions1[i], definitions2[i]) {
// Not a suffix
return false, nil, nil
}
definitions1 = definitions1[1:]
definitions2 = definitions2[1:]
}
addedPartitions2 := definitions2
partitionSpecs := make([]*sqlparser.PartitionSpec, 0, len(droppedPartitions1)+len(addedPartitions2))
for _, p := range droppedPartitions1 {
// And the suffix is any remaining definitions
addedPartitions2 := definitions2[len(definitions1):]

var partitionSpecs []*sqlparser.PartitionSpec
// Dropped partitions:
if len(droppedPartitions1) > 0 {
// A single DROP PARTITION clause can specify multiple partition names
partitionSpec := &sqlparser.PartitionSpec{
Action: sqlparser.DropAction,
Names: []sqlparser.IdentifierCI{p.Name},
}
for _, p := range droppedPartitions1 {
partitionSpec.Names = append(partitionSpec.Names, p.Name)
annotations.MarkRemoved(sqlparser.CanonicalString(p))
}
partitionSpecs = append(partitionSpecs, partitionSpec)
annotations.MarkRemoved(sqlparser.CanonicalString(p))
}
// Added partitions:
for _, p := range addedPartitions2 {
partitionSpec := &sqlparser.PartitionSpec{
Action: sqlparser.AddAction,
Expand Down
Loading

0 comments on commit 1c4cd97

Please sign in to comment.