Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
.*: cherry-pick #2139 and update change log for v2.0.7 (#2161)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Sep 26, 2021
1 parent 11861f7 commit ea66a07
Show file tree
Hide file tree
Showing 12 changed files with 169 additions and 35 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
# DM Changelog

All notable changes to this project will be documented in this file.

## [2.0.7] 2021-09-23

### Bug fixes

- Fix the error that binlog event is purged when switching `enable-gtid` in source configuration from `false` to `true` [#2094](https://github.com/pingcap/dm/pull/2094)
- Fix the memory leak problem of schema-tracker [#2133](https://github.com/pingcap/dm/pull/2133)

### Improvements

- Disable background statistic job in schema tracker to reduce CPU consumption [#2065](https://github.com/pingcap/dm/pull/2065)
- Support regular expressions rules for online DDL shadow and trash tables [#2139](https://github.com/pingcap/dm/pull/2139)

### Known issues

[GitHub issues](https://github.com/pingcap/dm/issues?q=is%3Aissue+label%3Aaffected-v2.0.7)

## [2.0.6] 2021-08-13

### Bug fixes
Expand Down
1 change: 1 addition & 0 deletions _utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ ErrConfigGenBAList,[code=20044:class=config:scope=internal:level=high], "Message
ErrConfigGenTableRouter,[code=20045:class=config:scope=internal:level=high], "Message: generate table router error, Workaround: Please check the `routes` config in task configuration file."
ErrConfigGenColumnMapping,[code=20046:class=config:scope=internal:level=high], "Message: generate column mapping error, Workaround: Please check the `column-mappings` config in task configuration file."
ErrConfigInvalidChunkFileSize,[code=20047:class=config:scope=internal:level=high], "Message: invalid `chunk-filesize` %v, Workaround: Please check the `chunk-filesize` config in task configuration file."
ErrConfigOnlineDDLInvalidRegex,[code=20048:class=config:scope=internal:level=high], "Message: config '%s' regex pattern '%s' invalid, reason: %s, Workaround: Please check if params is correctly in the configuration file."
ErrBinlogExtractPosition,[code=22001:class=binlog-op:scope=internal:level=high]
ErrBinlogInvalidFilename,[code=22002:class=binlog-op:scope=internal:level=high], "Message: invalid binlog filename"
ErrBinlogParsePosFromStr,[code=22003:class=binlog-op:scope=internal:level=high]
Expand Down
52 changes: 52 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"flag"
"fmt"
"regexp"
"strings"

"github.com/BurntSushi/toml"
Expand All @@ -38,6 +39,9 @@ const (
ModeAll = "all"
ModeFull = "full"
ModeIncrement = "incremental"

DefaultShadowTableRules = "^_(.+)_(?:new|gho)$"
DefaultTrashTableRules = "^_(.+)_(?:ghc|del|old)$"
)

var defaultMaxIdleConns = 2
Expand Down Expand Up @@ -134,6 +138,11 @@ type SubTaskConfig struct {
IsSharding bool `toml:"is-sharding" json:"is-sharding"`
ShardMode string `toml:"shard-mode" json:"shard-mode"`
OnlineDDL bool `toml:"online-ddl" json:"online-ddl"`

// pt/gh-ost name rule, support regex
ShadowTableRules []string `yaml:"shadow-table-rules" toml:"shadow-table-rules" json:"shadow-table-rules"`
TrashTableRules []string `yaml:"trash-table-rules" toml:"trash-table-rules" json:"trash-table-rules"`

// deprecated
OnlineDDLScheme string `toml:"online-ddl-scheme" json:"online-ddl-scheme"`

Expand Down Expand Up @@ -251,6 +260,29 @@ func (c *SubTaskConfig) Decode(data string, verifyDecryptPassword bool) error {
return c.Adjust(verifyDecryptPassword)
}

func adjustOnlineTableRules(ruleType string, rules []string) ([]string, error) {
adjustedRules := make([]string, 0, len(rules))
for _, r := range rules {
if !strings.HasPrefix(r, "^") {
r = "^" + r
}

if !strings.HasSuffix(r, "$") {
r += "$"
}

p, err := regexp.Compile(r)
if err != nil {
return rules, terror.ErrConfigOnlineDDLInvalidRegex.Generate(ruleType, r, "fail to compile: "+err.Error())
}
if p.NumSubexp() != 1 {
return rules, terror.ErrConfigOnlineDDLInvalidRegex.Generate(ruleType, r, "rule isn't contains exactly one submatch")
}
adjustedRules = append(adjustedRules, r)
}
return adjustedRules, nil
}

// Adjust adjusts and verifies configs.
func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if c.Name == "" {
Expand All @@ -274,6 +306,26 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
return terror.ErrConfigOnlineSchemeNotSupport.Generate(c.OnlineDDLScheme)
}

if len(c.ShadowTableRules) == 0 {
c.ShadowTableRules = []string{DefaultShadowTableRules}
} else {
shadowTableRule, err := adjustOnlineTableRules("shadow-table-rules", c.ShadowTableRules)
if err != nil {
return err
}
c.ShadowTableRules = shadowTableRule
}

if len(c.TrashTableRules) == 0 {
c.TrashTableRules = []string{DefaultTrashTableRules}
} else {
trashTableRule, err := adjustOnlineTableRules("trash-table-rules", c.TrashTableRules)
if err != nil {
return err
}
c.TrashTableRules = trashTableRule
}

if c.MetaSchema == "" {
c.MetaSchema = defaultMetaSchema
}
Expand Down
22 changes: 19 additions & 3 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,10 @@ type TaskConfig struct {
MySQLInstances []*MySQLInstance `yaml:"mysql-instances" toml:"mysql-instances" json:"mysql-instances"`

OnlineDDL bool `yaml:"online-ddl" toml:"online-ddl" json:"online-ddl"`
// pt/gh-ost name rule,support regex
ShadowTableRules []string `yaml:"shadow-table-rules" toml:"shadow-table-rules" json:"shadow-table-rules"`
TrashTableRules []string `yaml:"trash-table-rules" toml:"trash-table-rules" json:"trash-table-rules"`

// deprecated
OnlineDDLScheme string `yaml:"online-ddl-scheme" toml:"online-ddl-scheme" json:"online-ddl-scheme"`

Expand Down Expand Up @@ -696,6 +700,8 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.IsSharding = c.IsSharding
cfg.ShardMode = c.ShardMode
cfg.OnlineDDL = c.OnlineDDL
cfg.TrashTableRules = c.TrashTableRules
cfg.ShadowTableRules = c.ShadowTableRules
cfg.IgnoreCheckingItems = c.IgnoreCheckingItems
cfg.Name = c.Name
cfg.Mode = c.TaskMode
Expand Down Expand Up @@ -1033,9 +1039,11 @@ type TaskConfigForDowngrade struct {
EnableANSIQuotes bool `yaml:"ansi-quotes"`
RemoveMeta bool `yaml:"remove-meta"`
// new config item
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"`
ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"`
OnlineDDL bool `yaml:"online-ddl,omitempty"`
ShadowTableRules []string `yaml:"shadow-table-rules,omitempty"`
TrashTableRules []string `yaml:"trash-table-rules,omitempty"`
}

// NewTaskConfigForDowngrade create new TaskConfigForDowngrade.
Expand Down Expand Up @@ -1068,6 +1076,8 @@ func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade {
MySQLInstances: NewMySQLInstancesForDowngrade(taskConfig.MySQLInstances),
ExprFilter: taskConfig.ExprFilter,
OnlineDDL: taskConfig.OnlineDDL,
ShadowTableRules: taskConfig.ShadowTableRules,
TrashTableRules: taskConfig.TrashTableRules,
}
}

Expand All @@ -1080,6 +1090,12 @@ func (c *TaskConfigForDowngrade) omitDefaultVals() {
delete(c.TargetDB.Session, "time_zone")
}
}
if len(c.ShadowTableRules) == 1 && c.ShadowTableRules[0] == DefaultShadowTableRules {
c.ShadowTableRules = nil
}
if len(c.TrashTableRules) == 1 && c.TrashTableRules[0] == DefaultTrashTableRules {
c.TrashTableRules = nil
}
}

// Yaml returns YAML format representation of config.
Expand Down
2 changes: 2 additions & 0 deletions dm/config/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,8 @@ func (t *testConfig) TestGenAndFromSubTaskConfigs(c *C) {
IsSharding: true,
ShardMode: shardMode,
OnlineDDL: onlineDDL,
ShadowTableRules: []string{DefaultShadowTableRules},
TrashTableRules: []string{DefaultTrashTableRules},
CaseSensitive: true,
Name: name,
Mode: taskMode,
Expand Down
6 changes: 6 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1066,6 +1066,12 @@ description = ""
workaround = "Please check the `chunk-filesize` config in task configuration file."
tags = ["internal", "high"]

[error.DM-config-20048]
message = "config '%s' regex pattern '%s' invalid, reason: %s"
description = ""
workaround = "Please check if params is correctly in the configuration file."
tags = ["internal", "high"]

[error.DM-binlog-op-22001]
message = ""
description = ""
Expand Down
3 changes: 3 additions & 0 deletions pkg/terror/error_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ const (
codeConfigGenTableRouter
codeConfigGenColumnMapping
codeConfigInvalidChunkFileSize
codeConfigOnlineDDLInvalidRegex
)

// Binlog operation error code list.
Expand Down Expand Up @@ -862,6 +863,8 @@ var (
ErrConfigGenTableRouter = New(codeConfigGenTableRouter, ClassConfig, ScopeInternal, LevelHigh, "generate table router error", "Please check the `routes` config in task configuration file.")
ErrConfigGenColumnMapping = New(codeConfigGenColumnMapping, ClassConfig, ScopeInternal, LevelHigh, "generate column mapping error", "Please check the `column-mappings` config in task configuration file.")
ErrConfigInvalidChunkFileSize = New(codeConfigInvalidChunkFileSize, ClassConfig, ScopeInternal, LevelHigh, "invalid `chunk-filesize` %v", "Please check the `chunk-filesize` config in task configuration file.")
ErrConfigOnlineDDLInvalidRegex = New(codeConfigOnlineDDLInvalidRegex, ClassConfig, ScopeInternal, LevelHigh,
"config '%s' regex pattern '%s' invalid, reason: %s", "Please check if params is correctly in the configuration file.")

// Binlog operation error.
ErrBinlogExtractPosition = New(codeBinlogExtractPosition, ClassBinlogOp, ScopeInternal, LevelHigh, "", "")
Expand Down
49 changes: 40 additions & 9 deletions syncer/online-ddl-tools/online_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ package onlineddl
import (
"encoding/json"
"fmt"
"strings"
"regexp"
"sync"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -379,13 +379,33 @@ func (s *Storage) CheckAndUpdate(
// (_*).*_old ghost trash table
// we don't support `--new-table-name` flag.
type RealOnlinePlugin struct {
storage *Storage
storage *Storage
shadowRegs []*regexp.Regexp
trashRegs []*regexp.Regexp
}

// NewRealOnlinePlugin returns real online plugin.
func NewRealOnlinePlugin(tctx *tcontext.Context, cfg *config.SubTaskConfig) (OnlinePlugin, error) {
shadowRegs := make([]*regexp.Regexp, 0, len(cfg.ShadowTableRules))
trashRegs := make([]*regexp.Regexp, 0, len(cfg.TrashTableRules))
for _, sg := range cfg.ShadowTableRules {
shadowReg, err := regexp.Compile(sg)
if err != nil {
return nil, terror.ErrConfigOnlineDDLInvalidRegex.Generate("shadow-table-rules", sg, "fail to compile: "+err.Error())
}
shadowRegs = append(shadowRegs, shadowReg)
}
for _, tg := range cfg.TrashTableRules {
trashReg, err := regexp.Compile(tg)
if err != nil {
return nil, terror.ErrConfigOnlineDDLInvalidRegex.Generate("trash-table-rules", tg, "fail to compile: "+err.Error())
}
trashRegs = append(trashRegs, trashReg)
}
r := &RealOnlinePlugin{
storage: NewOnlineDDLStorage(tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("online ddl", ""))), cfg), // create a context for logger
storage: NewOnlineDDLStorage(tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("online ddl", ""))), cfg), // create a context for logger
shadowRegs: shadowRegs,
trashRegs: trashRegs,
}

return r, r.storage.Init(tctx)
Expand Down Expand Up @@ -487,12 +507,14 @@ func (r *RealOnlinePlugin) Finish(tctx *tcontext.Context, schema, table string)
// TableType implements interface.
func (r *RealOnlinePlugin) TableType(table string) TableType {
// 5 is _ _gho/ghc/del or _ _old/new
if len(table) > 5 && strings.HasPrefix(table, "_") {
if strings.HasSuffix(table, "_gho") || strings.HasSuffix(table, "_new") {
for _, shadowReg := range r.shadowRegs {
if shadowReg.MatchString(table) {
return GhostTable
}
}

if strings.HasSuffix(table, "_ghc") || strings.HasSuffix(table, "_del") || strings.HasSuffix(table, "_old") {
for _, trashReg := range r.trashRegs {
if trashReg.MatchString(table) {
return TrashTable
}
}
Expand All @@ -501,9 +523,18 @@ func (r *RealOnlinePlugin) TableType(table string) TableType {

// RealName implements interface.
func (r *RealOnlinePlugin) RealName(table string) string {
tp := r.TableType(table)
if tp == GhostTable || tp == TrashTable {
table = table[1 : len(table)-4]
for _, shadowReg := range r.shadowRegs {
shadowRes := shadowReg.FindStringSubmatch(table)
if len(shadowRes) > 1 {
return shadowRes[1]
}
}

for _, trashReg := range r.trashRegs {
trashRes := trashReg.FindStringSubmatch(table)
if len(trashRes) > 1 {
return trashRes[1]
}
}
return table
}
Expand Down
18 changes: 10 additions & 8 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,16 @@ func (s *testSyncerSuite) SetUpSuite(c *C) {
Dir: loaderDir,
}
s.cfg = &config.SubTaskConfig{
From: getDBConfigFromEnv(),
To: getDBConfigFromEnv(),
ServerID: 101,
MetaSchema: "test",
Name: "syncer_ut",
Mode: config.ModeIncrement,
Flavor: "mysql",
LoaderConfig: loaderCfg,
From: getDBConfigFromEnv(),
To: getDBConfigFromEnv(),
ServerID: 101,
MetaSchema: "test",
Name: "syncer_ut",
ShadowTableRules: []string{config.DefaultShadowTableRules},
TrashTableRules: []string{config.DefaultTrashTableRules},
Mode: config.ModeIncrement,
Flavor: "mysql",
LoaderConfig: loaderCfg,
}
s.cfg.From.Adjust()
s.cfg.To.Adjust()
Expand Down
2 changes: 2 additions & 0 deletions tests/dmctl_basic/conf/get_task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ mysql-instances:
syncer: null
syncer-thread: 0
online-ddl: false
shadow-table-rules: []
trash-table-rules: []
online-ddl-scheme: ""
routes:
route-01:
Expand Down
30 changes: 15 additions & 15 deletions tests/import_goroutine_leak/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,21 @@ function run() {
check_port_offline $WORKER1_PORT 20
sleep 2

for ((k=0;k<10;k++)); do
# dm-worker1 panics
err_cnt=$(grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l)
# there may be more panic
if [ $err_cnt -lt 2 ]; then
echo "dm-worker1 doesn't panic again, panic count ${err_cnt}"
if [ $k -eq 9 ]; then
echo "panic check failed 10 times, will exit now"
exit 2
fi
else
break
fi
sleep 2
done
for ((k = 0; k < 10; k++)); do
# dm-worker1 panics
err_cnt=$(grep "panic" $WORK_DIR/worker1/log/stdout.log | wc -l)
# there may be more panic
if [ $err_cnt -lt 2 ]; then
echo "dm-worker1 doesn't panic again, panic count ${err_cnt}"
if [ $k -eq 9 ]; then
echo "panic check failed 10 times, will exit now"
exit 2
fi
else
break
fi
sleep 2
done

echo "restart dm-workers with errros to pause"
# paused with injected error
Expand Down
2 changes: 2 additions & 0 deletions tests/import_v10x/conf/task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ mysql-instances:
syncer: null
syncer-thread: 0
online-ddl: false
shadow-table-rules: []
trash-table-rules: []
online-ddl-scheme: ""
routes: {}
filters: {}
Expand Down

0 comments on commit ea66a07

Please sign in to comment.