Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
Hotfix again (#1641)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Apr 29, 2021
1 parent be5b54f commit 6c108ab
Show file tree
Hide file tree
Showing 26 changed files with 54 additions and 51 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ relay_log/*
vendor
*/*.DS_Store
tidb-slow.log
.idea/
3 changes: 3 additions & 0 deletions dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ type SubTaskConfig struct {

ConfigFile string `toml:"-" json:"config-file"`

UpperSchema []string `toml:"upper-schema" json:"upper-schema"`
UpperTable []string `toml:"upper-table" json:"upper-table"`

// still needed by Syncer / Loader bin
printVersion bool
}
Expand Down
6 changes: 6 additions & 0 deletions dm/config/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,9 @@ type TaskConfig struct {
Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"`
Loaders map[string]*LoaderConfig `yaml:"loaders"`
Syncers map[string]*SyncerConfig `yaml:"syncers"`

UpperSchema []string `yaml:"upper-schema"`
UpperTable []string `yaml:"upper-table"`
}

// NewTaskConfig creates a TaskConfig
Expand Down Expand Up @@ -520,6 +523,9 @@ func (c *TaskConfig) SubTaskConfigs(sources map[string]DBConfig) ([]*SubTaskConf
cfg.LoaderConfig = *inst.Loader
cfg.SyncerConfig = *inst.Syncer

cfg.UpperSchema = c.UpperSchema
cfg.UpperTable = c.UpperTable

err := cfg.Adjust(true)
if err != nil {
return nil, terror.Annotatef(err, "source %s", inst.SourceID)
Expand Down
2 changes: 1 addition & 1 deletion syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func (s *Syncer) dropSchemaInSharding(tctx *tcontext.Context, sourceSchema strin
targetSchema, targetTable := UnpackTableID(name)
sourceIDs := make([]string, 0, len(tables))
for _, table := range tables {
sourceID, _ := GenTableID(table[0], table[1])
sourceID, _ := GenTableID(table[0], table[1], s.cfg.UpperSchema, s.cfg.UpperTable)
sourceIDs = append(sourceIDs, sourceID)
}
err := s.sgk.LeaveGroup(targetSchema, targetTable, sourceIDs)
Expand Down
40 changes: 33 additions & 7 deletions syncer/sharding_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/pkg/conn"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
shardmeta "github.com/pingcap/dm/syncer/sharding-meta"

Expand Down Expand Up @@ -379,7 +380,32 @@ func (sg *ShardingGroup) FlushData(targetTableID string) ([]string, [][]interfac
}

// GenTableID generates table ID
func GenTableID(schema, table string) (ID string, isSchemaOnly bool) {
func GenTableID(schema, table string, upperSchema, upperTable []string) (ID string, isSchemaOnly bool) {
for _, s := range upperSchema {
if s == schema {
oldSchema := schema
schema = strings.ToLower(schema)
if oldSchema != schema {
log.L().Warn("hotfix, changing schema to lowercase",
zap.String("old schema", oldSchema),
zap.String("schema", schema))
}
break
}
}
for _, t := range upperTable {
if t == table {
oldTable := table
table = strings.ToLower(table)
if oldTable != table {
log.L().Warn("hotfix, changing table to lowercase",
zap.String("old table", oldTable),
zap.String("table", table))
}
break
}
}

if len(table) == 0 {
return fmt.Sprintf("`%s`", schema), true
}
Expand Down Expand Up @@ -425,8 +451,8 @@ func NewShardingGroupKeeper(tctx *tcontext.Context, cfg *config.SubTaskConfig) *
func (k *ShardingGroupKeeper) AddGroup(targetSchema, targetTable string, sourceIDs []string, meta *shardmeta.ShardingMeta, merge bool) (needShardingHandle bool, group *ShardingGroup, synced bool, remain int, err error) {
// if need to support target table-level sharding DDL
// we also need to support target schema-level sharding DDL
schemaID, _ := GenTableID(targetSchema, "")
targetTableID, _ := GenTableID(targetSchema, targetTable)
schemaID, _ := GenTableID(targetSchema, "", k.cfg.UpperSchema, k.cfg.UpperTable)
targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)

k.Lock()
defer k.Unlock()
Expand Down Expand Up @@ -486,8 +512,8 @@ func (k *ShardingGroupKeeper) ResetGroups() {
// LeaveGroup leaves group according to target schema, table and source IDs
// LeaveGroup doesn't affect in syncing process
func (k *ShardingGroupKeeper) LeaveGroup(targetSchema, targetTable string, sources []string) error {
schemaID, _ := GenTableID(targetSchema, "")
targetTableID, _ := GenTableID(targetSchema, targetTable)
schemaID, _ := GenTableID(targetSchema, "", k.cfg.UpperSchema, k.cfg.UpperTable)
targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)
k.Lock()
defer k.Unlock()
if group, ok := k.groups[targetTableID]; ok {
Expand All @@ -514,7 +540,7 @@ func (k *ShardingGroupKeeper) TrySync(
targetSchema, targetTable, source string, pos, endPos mysql.Position, ddls []string) (
needShardingHandle bool, group *ShardingGroup, synced, active bool, remain int, err error) {

targetTableID, schemaOnly := GenTableID(targetSchema, targetTable)
targetTableID, schemaOnly := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)
if schemaOnly {
// NOTE: now we don't support syncing for schema only sharding DDL
return false, nil, true, false, 0, nil
Expand Down Expand Up @@ -563,7 +589,7 @@ func (k *ShardingGroupKeeper) UnresolvedTables() (map[string]bool, [][]string) {

// Group returns target table's group, nil if not exist
func (k *ShardingGroupKeeper) Group(targetSchema, targetTable string) *ShardingGroup {
targetTableID, _ := GenTableID(targetSchema, targetTable)
targetTableID, _ := GenTableID(targetSchema, targetTable, k.cfg.UpperSchema, k.cfg.UpperTable)
k.RLock()
defer k.RUnlock()
return k.groups[targetTableID]
Expand Down
13 changes: 7 additions & 6 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ func (s *Syncer) initShardingGroups() error {
if !ok {
mSchema[targetTable] = make([]string, 0, len(tables))
}
ID, _ := GenTableID(schema, table)
ID, _ := GenTableID(schema, table, s.cfg.UpperSchema, s.cfg.UpperTable)
mSchema[targetTable] = append(mSchema[targetTable], ID)
}
}
Expand All @@ -453,7 +453,7 @@ func (s *Syncer) initShardingGroups() error {
// add sharding group
for targetSchema, mSchema := range mapper {
for targetTable, sourceIDs := range mSchema {
tableID, _ := GenTableID(targetSchema, targetTable)
tableID, _ := GenTableID(targetSchema, targetTable, s.cfg.UpperSchema, s.cfg.UpperTable)
_, _, _, _, err := s.sgk.AddGroup(targetSchema, targetTable, sourceIDs, loadMeta[tableID], false)
if err != nil {
return err
Expand Down Expand Up @@ -1447,7 +1447,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}

if s.cfg.IsSharding {
source, _ := GenTableID(originSchema, originTable)
source, _ := GenTableID(originSchema, originTable, s.cfg.UpperSchema, s.cfg.UpperTable)
if s.sgk.InSyncing(schemaName, tableName, source, *ec.currentPos) {
// if in unsync stage and not before active DDL, ignore it
// if in sharding re-sync stage and not before active DDL (the next DDL to be synced), ignore it
Expand Down Expand Up @@ -1666,7 +1666,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
}
continue
case *ast.DropTableStmt:
sourceID, _ := GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name)
sourceID, _ := GenTableID(tableNames[0][0].Schema, tableNames[0][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable)
err = s.sgk.LeaveGroup(tableNames[1][0].Schema, tableNames[1][0].Name, []string{sourceID})
if err != nil {
return err
Expand Down Expand Up @@ -1772,7 +1772,8 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
Name: ec.currentPos.Name,
Pos: ec.currentPos.Pos - ec.header.EventSize,
}
source, _ = GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name)

source, _ = GenTableID(ddlInfo.tableNames[0][0].Schema, ddlInfo.tableNames[0][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable)

var annotate string
switch ddlInfo.stmt.(type) {
Expand Down Expand Up @@ -1801,7 +1802,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e
s.tctx.L().Info(annotate, zap.String("event", "query"), zap.String("source", source), zap.Strings("ddls", needHandleDDLs), zap.ByteString("raw statement", ev.Query), zap.Bool("in-sharding", needShardingHandle), zap.Stringer("start position", startPos), zap.Bool("is-synced", synced), zap.Int("unsynced", remain))

if needShardingHandle {
target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name)
target, _ := GenTableID(ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name, s.cfg.UpperSchema, s.cfg.UpperTable)
unsyncedTableGauge.WithLabelValues(s.cfg.Name, target).Set(float64(remain))
err = ec.safeMode.IncrForTable(s.tctx, ddlInfo.tableNames[1][0].Schema, ddlInfo.tableNames[1][0].Name) // try enable safe-mode when starting syncing for sharding group
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions tests/all_mode/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/compatibility/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/dmctl_basic/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/dmctl_command/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/full_mode/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/http_apis/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/incremental_mode/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/initial_unit/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/load_interrupt/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/online_ddl/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/print_status/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

use-checkpoint = false
Expand Down
2 changes: 0 additions & 2 deletions tests/relay_interrupt/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/retry_cancel/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/safe_mode/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/sequence_safe_mode/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/sequence_sharding/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
1 change: 1 addition & 0 deletions tests/sequence_sharding/conf/dm-task.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ meta-schema: "dm_meta"
remove-meta: false
enable-heartbeat: true
timezone: "Asia/Shanghai"
upper-table: ["T1"]

target-database:
host: "127.0.0.1"
Expand Down
3 changes: 2 additions & 1 deletion tests/sequence_sharding/data/db1.increment.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ insert into t1 (uid,name) values (100003,'NR');
update t1 set name = 'uxoKehvqWg' where `uid` = 100001;
update t1 set name = 'bapYymrtfT' where name = 'igvApUx';
insert into t2 (uid,name) values (200004,'CXDvoltoliUINgo'),(200005,'188689130');
alter table t1 add column c int;
alter table T1 add column c int;
insert into t1 (uid,name,c) values (123123,'test',123);
alter table t1 add index c(c);
update t1 set c = 100;
alter table t1 add column d int;
Expand Down
2 changes: 0 additions & 2 deletions tests/sharding/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down
2 changes: 0 additions & 2 deletions tests/start_task/conf/diff_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"
Expand Down

0 comments on commit 6c108ab

Please sign in to comment.