diff --git a/chaos/cases/conf/task-optimistic.yaml b/chaos/cases/conf/task-optimistic.yaml index 84113731be..2f80886dc4 100644 --- a/chaos/cases/conf/task-optimistic.yaml +++ b/chaos/cases/conf/task-optimistic.yaml @@ -20,20 +20,24 @@ mysql-instances: black-white-list: "instance" mydumper-thread: 4 loader-thread: 16 - syncer-thread: 16 + syncer-config-name: "global" - source-id: "replica-02" black-white-list: "instance" mydumper-thread: 4 loader-thread: 16 - syncer-thread: 16 + syncer-config-name: "global" - source-id: "replica-03" black-white-list: "instance" mydumper-thread: 4 loader-thread: 16 - syncer-thread: 16 + syncer-config-name: "global" black-white-list: instance: do-dbs: ["db_optimistic"] + +syncers: + global: + compact: true diff --git a/chaos/cases/conf/task-pessimistic.yaml b/chaos/cases/conf/task-pessimistic.yaml index 0af0c2b7a2..fb7b5f3278 100644 --- a/chaos/cases/conf/task-pessimistic.yaml +++ b/chaos/cases/conf/task-pessimistic.yaml @@ -20,20 +20,24 @@ mysql-instances: black-white-list: "instance" mydumper-thread: 4 loader-thread: 16 - syncer-thread: 16 + syncer-config-name: "global" - source-id: "replica-02" black-white-list: "instance" mydumper-thread: 4 loader-thread: 16 - syncer-thread: 16 + syncer-config-name: "global" - source-id: "replica-03" black-white-list: "instance" mydumper-thread: 4 loader-thread: 16 - syncer-thread: 16 + syncer-config-name: "global" black-white-list: instance: do-dbs: ["db_pessimistic"] + +syncers: + global: + compact: true diff --git a/chaos/cases/conf/task-single.yaml b/chaos/cases/conf/task-single.yaml index e8b725b7d5..63cd49670e 100644 --- a/chaos/cases/conf/task-single.yaml +++ b/chaos/cases/conf/task-single.yaml @@ -14,8 +14,12 @@ mysql-instances: black-white-list: "instance" mydumper-thread: 4 loader-thread: 16 - syncer-thread: 16 + syncer-config-name: "global" black-white-list: instance: do-dbs: ["db_single"] + +syncers: + global: + compact: true diff --git a/chaos/cases/task.go b/chaos/cases/task.go index e2df7a9070..525fa867ea 100644 --- a/chaos/cases/task.go +++ b/chaos/cases/task.go @@ -79,7 +79,12 @@ func newTask(ctx context.Context, cli pb.MasterClient, taskFile string, schema s sourceConns = make([]*dbConn, 0, len(taskCfg.MySQLInstances)) res = make(results, 0, len(taskCfg.MySQLInstances)) ) - for i := range taskCfg.MySQLInstances { // only use necessary part of sources. + for i, m := range taskCfg.MySQLInstances { // only use necessary part of sources. + // reset Syncer, otherwise will report ERROR 20017 + if len(m.SyncerConfigName) > 0 && m.Syncer != nil { + m.Syncer = nil + } + cfg := sourcesCfg[i] db, err2 := conn.DefaultDBProvider.Apply(cfg) if err2 != nil { @@ -270,7 +275,7 @@ func (t *task) genFullData() error { // createTask does `start-task` operation. func (t *task) createTask() error { - t.logger.Info("starting the task") + t.logger.Info("starting the task", zap.String("task cfg", t.taskCfg.String())) resp, err := t.cli.StartTask(t.ctx, &pb.StartTaskRequest{ Task: t.taskCfg.String(), }) diff --git a/dm/config/task.go b/dm/config/task.go index 435398ae6d..f0bf591c24 100644 --- a/dm/config/task.go +++ b/dm/config/task.go @@ -237,7 +237,8 @@ type SyncerConfig struct { Batch int `yaml:"batch" toml:"batch" json:"batch"` QueueSize int `yaml:"queue-size" toml:"queue-size" json:"queue-size"` // checkpoint flush interval in seconds. - CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"` + CheckpointFlushInterval int `yaml:"checkpoint-flush-interval" toml:"checkpoint-flush-interval" json:"checkpoint-flush-interval"` + Compact bool `yaml:"compact" toml:"compact" json:"compact"` // deprecated MaxRetry int `yaml:"max-retry" toml:"max-retry" json:"max-retry"` @@ -857,34 +858,76 @@ func NewMySQLInstancesForDowngrade(mysqlInstances []*MySQLInstance) []*MySQLInst return mysqlInstancesForDowngrade } +// SyncerConfigForDowngrade is the base configuration for syncer in v2.0. +// This config is used for downgrade(config export) from a higher dmctl version. +// When we add any new config item into SyncerConfig, we should update it also. +type SyncerConfigForDowngrade struct { + MetaFile string `yaml:"meta-file"` + WorkerCount int `yaml:"worker-count"` + Batch int `yaml:"batch"` + QueueSize int `yaml:"queue-size"` + CheckpointFlushInterval int `yaml:"checkpoint-flush-interval"` + MaxRetry int `yaml:"max-retry"` + AutoFixGTID bool `yaml:"auto-fix-gtid"` + EnableGTID bool `yaml:"enable-gtid"` + DisableCausality bool `yaml:"disable-detect"` + SafeMode bool `yaml:"safe-mode"` + EnableANSIQuotes bool `yaml:"enable-ansi-quotes"` + + Compact bool `yaml:"compact,omitempty"` +} + +// NewSyncerConfigsForDowngrade converts SyncerConfig to SyncerConfigForDowngrade. +func NewSyncerConfigsForDowngrade(syncerConfigs map[string]*SyncerConfig) map[string]*SyncerConfigForDowngrade { + syncerConfigsForDowngrade := make(map[string]*SyncerConfigForDowngrade, len(syncerConfigs)) + for configName, syncerConfig := range syncerConfigs { + newSyncerConfig := &SyncerConfigForDowngrade{ + MetaFile: syncerConfig.MetaFile, + WorkerCount: syncerConfig.WorkerCount, + Batch: syncerConfig.Batch, + QueueSize: syncerConfig.QueueSize, + CheckpointFlushInterval: syncerConfig.CheckpointFlushInterval, + MaxRetry: syncerConfig.MaxRetry, + AutoFixGTID: syncerConfig.AutoFixGTID, + EnableGTID: syncerConfig.EnableGTID, + DisableCausality: syncerConfig.DisableCausality, + SafeMode: syncerConfig.SafeMode, + EnableANSIQuotes: syncerConfig.EnableANSIQuotes, + Compact: syncerConfig.Compact, + } + syncerConfigsForDowngrade[configName] = newSyncerConfig + } + return syncerConfigsForDowngrade +} + // TaskConfigForDowngrade is the base configuration for task in v2.0. // This config is used for downgrade(config export) from a higher dmctl version. // When we add any new config item into SourceConfig, we should update it also. type TaskConfigForDowngrade struct { - Name string `yaml:"name"` - TaskMode string `yaml:"task-mode"` - IsSharding bool `yaml:"is-sharding"` - ShardMode string `yaml:"shard-mode"` - IgnoreCheckingItems []string `yaml:"ignore-checking-items"` - MetaSchema string `yaml:"meta-schema"` - EnableHeartbeat bool `yaml:"enable-heartbeat"` - HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"` - HeartbeatReportInterval int `yaml:"heartbeat-report-interval"` - Timezone string `yaml:"timezone"` - CaseSensitive bool `yaml:"case-sensitive"` - TargetDB *DBConfig `yaml:"target-database"` - OnlineDDLScheme string `yaml:"online-ddl-scheme"` - Routes map[string]*router.TableRule `yaml:"routes"` - Filters map[string]*bf.BinlogEventRule `yaml:"filters"` - ColumnMappings map[string]*column.Rule `yaml:"column-mappings"` - BWList map[string]*filter.Rules `yaml:"black-white-list"` - BAList map[string]*filter.Rules `yaml:"block-allow-list"` - Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"` - Loaders map[string]*LoaderConfig `yaml:"loaders"` - Syncers map[string]*SyncerConfig `yaml:"syncers"` - CleanDumpFile bool `yaml:"clean-dump-file"` - EnableANSIQuotes bool `yaml:"ansi-quotes"` - RemoveMeta bool `yaml:"remove-meta"` + Name string `yaml:"name"` + TaskMode string `yaml:"task-mode"` + IsSharding bool `yaml:"is-sharding"` + ShardMode string `yaml:"shard-mode"` + IgnoreCheckingItems []string `yaml:"ignore-checking-items"` + MetaSchema string `yaml:"meta-schema"` + EnableHeartbeat bool `yaml:"enable-heartbeat"` + HeartbeatUpdateInterval int `yaml:"heartbeat-update-interval"` + HeartbeatReportInterval int `yaml:"heartbeat-report-interval"` + Timezone string `yaml:"timezone"` + CaseSensitive bool `yaml:"case-sensitive"` + TargetDB *DBConfig `yaml:"target-database"` + OnlineDDLScheme string `yaml:"online-ddl-scheme"` + Routes map[string]*router.TableRule `yaml:"routes"` + Filters map[string]*bf.BinlogEventRule `yaml:"filters"` + ColumnMappings map[string]*column.Rule `yaml:"column-mappings"` + BWList map[string]*filter.Rules `yaml:"black-white-list"` + BAList map[string]*filter.Rules `yaml:"block-allow-list"` + Mydumpers map[string]*MydumperConfig `yaml:"mydumpers"` + Loaders map[string]*LoaderConfig `yaml:"loaders"` + Syncers map[string]*SyncerConfigForDowngrade `yaml:"syncers"` + CleanDumpFile bool `yaml:"clean-dump-file"` + EnableANSIQuotes bool `yaml:"ansi-quotes"` + RemoveMeta bool `yaml:"remove-meta"` // new config item MySQLInstances []*MySQLInstanceForDowngrade `yaml:"mysql-instances"` ExprFilter map[string]*ExpressionFilter `yaml:"expression-filter,omitempty"` @@ -916,7 +959,7 @@ func NewTaskConfigForDowngrade(taskConfig *TaskConfig) *TaskConfigForDowngrade { BAList: taskConfig.BAList, Mydumpers: taskConfig.Mydumpers, Loaders: taskConfig.Loaders, - Syncers: taskConfig.Syncers, + Syncers: NewSyncerConfigsForDowngrade(taskConfig.Syncers), CleanDumpFile: taskConfig.CleanDumpFile, EnableANSIQuotes: taskConfig.EnableANSIQuotes, RemoveMeta: taskConfig.RemoveMeta, diff --git a/dm/config/task_test.go b/dm/config/task_test.go index fbf67b5fdf..37a64f317f 100644 --- a/dm/config/task_test.go +++ b/dm/config/task_test.go @@ -1050,6 +1050,21 @@ func cloneValues(dest, src reflect.Value) { srcType = srcType.Elem() } + if destType.Kind() == reflect.Map { + destMap := reflect.MakeMap(destType) + for _, k := range src.MapKeys() { + if src.MapIndex(k).Type().Kind() == reflect.Ptr { + newVal := reflect.New(destType.Elem().Elem()) + cloneValues(newVal, src.MapIndex(k)) + destMap.SetMapIndex(k, newVal) + } else { + cloneValues(destMap.MapIndex(k).Addr(), src.MapIndex(k).Addr()) + } + } + dest.Set(destMap) + return + } + if destType.Kind() == reflect.Slice { slice := reflect.MakeSlice(destType, src.Len(), src.Cap()) for i := 0; i < src.Len(); i++ { diff --git a/syncer/compactor.go b/syncer/compactor.go new file mode 100644 index 0000000000..8e268a9f98 --- /dev/null +++ b/syncer/compactor.go @@ -0,0 +1,182 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "github.com/pingcap/failpoint" + "go.uber.org/zap" + + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/syncer/metrics" +) + +// compactor compacts multiple statements into one statement. +type compactor struct { + inCh chan *job + outCh chan *job + bufferSize int + logger log.Logger + safeMode bool + + keyMap map[string]map[string]int // table -> key(pk or (uk + not null)) -> index in buffer + buffer []*job + + // for metrics + task string + source string +} + +// compactorWrap creates and runs a compactor instance. +func compactorWrap(inCh chan *job, syncer *Syncer) chan *job { + bufferSize := syncer.cfg.QueueSize * syncer.cfg.WorkerCount / 4 + compactor := &compactor{ + inCh: inCh, + outCh: make(chan *job, bufferSize), + bufferSize: bufferSize, + logger: syncer.tctx.Logger.WithFields(zap.String("component", "compactor")), + keyMap: make(map[string]map[string]int), + buffer: make([]*job, 0, bufferSize), + task: syncer.cfg.Name, + source: syncer.cfg.SourceID, + } + go func() { + compactor.run() + compactor.close() + }() + return compactor.outCh +} + +// run runs a compactor instance. +func (c *compactor) run() { + for j := range c.inCh { + metrics.QueueSizeGauge.WithLabelValues(c.task, "compactor_input", c.source).Set(float64(len(c.inCh))) + + if j.tp == flush { + c.flushBuffer() + c.outCh <- j + continue + } + + // set safeMode when receive first job + if len(c.buffer) == 0 { + c.safeMode = j.dml.safeMode + } + // if dml has no PK/NOT NULL UK, do not compact it. + if j.dml.identifyColumns() == nil { + c.buffer = append(c.buffer, j) + continue + } + + // if update job update its identify keys, turn it into delete + insert + if j.dml.op == update && j.dml.updateIdentify() { + delDML, insertDML := updateToDelAndInsert(j.dml) + delJob := j.clone() + delJob.tp = del + delJob.dml = delDML + + insertJob := j.clone() + insertJob.tp = insert + insertJob.dml = insertDML + + c.compactJob(delJob) + c.compactJob(insertJob) + } else { + c.compactJob(j) + } + + failpoint.Inject("SkipFlushCompactor", func() { + failpoint.Continue() + }) + // if no inner jobs, buffer is full or outer channel empty, flush the buffer + if len(c.inCh) == 0 || len(c.buffer) >= c.bufferSize || len(c.outCh) == 0 { + c.flushBuffer() + } + } +} + +// close closes outer channels. +func (c *compactor) close() { + close(c.outCh) +} + +// flushBuffer flush buffer and reset compactor. +func (c *compactor) flushBuffer() { + for _, j := range c.buffer { + if j != nil { + // set safemode for all jobs by first job in buffer. + if c.safeMode { + j.dml.safeMode = true + } + c.outCh <- j + } + } + c.keyMap = make(map[string]map[string]int) + c.buffer = c.buffer[0:0] +} + +// compactJob compact jobs. +// INSERT + INSERT => X ‾| +// UPDATE + INSERT => X |=> anything + INSERT => INSERT +// DELETE + INSERT => INSERT _| +// INSERT + DELETE => DELETE ‾| +// UPDATE + DELETE => DELETE |=> anything + DELETE => DELETE +// DELETE + DELETE => X _| +// INSERT + UPDATE => INSERT ‾| +// UPDATE + UPDATE => UPDATE |=> INSERT + UPDATE => INSERT, UPDATE + UPDATE => UPDATE +// DELETE + UPDATE => X _| +// . +func (c *compactor) compactJob(j *job) { + tableName := j.dml.targetTableID + tableKeyMap, ok := c.keyMap[tableName] + if !ok { + c.keyMap[tableName] = make(map[string]int, c.bufferSize) + tableKeyMap = c.keyMap[tableName] + } + + key := j.dml.identifyKey() + prevPos, ok := tableKeyMap[key] + // if no such key in the buffer, add it + if !ok || prevPos >= len(c.buffer) { + // should not happen, avoid panic + if ok { + c.logger.Error("cannot find previous job by key", zap.String("key", key), zap.Int("pos", prevPos)) + } + tableKeyMap[key] = len(c.buffer) + c.buffer = append(c.buffer, j) + return + } + + prevJob := c.buffer[prevPos] + c.logger.Debug("start to compact", zap.Stringer("previous dml", prevJob.dml), zap.Stringer("current dml", j.dml)) + + if j.tp == update { + if prevJob.tp == insert { + // INSERT + UPDATE => INSERT + j.tp = insert + j.dml.oldValues = nil + j.dml.originOldValues = nil + j.dml.op = insert + } else if prevJob.tp == update { + // UPDATE + UPDATE => UPDATE + j.dml.oldValues = prevJob.dml.oldValues + j.dml.originOldValues = prevJob.dml.originOldValues + } + } + + // mark previous job as compacted(nil), add new job + c.buffer[prevPos] = nil + tableKeyMap[key] = len(c.buffer) + c.buffer = append(c.buffer, j) + c.logger.Debug("finish to compact", zap.Stringer("dml", j.dml)) +} diff --git a/syncer/compactor_test.go b/syncer/compactor_test.go new file mode 100644 index 0000000000..cac51b0fa4 --- /dev/null +++ b/syncer/compactor_test.go @@ -0,0 +1,256 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package syncer + +import ( + "context" + "math/rand" + "time" + + . "github.com/pingcap/check" + "github.com/pingcap/failpoint" + filter "github.com/pingcap/tidb-tools/pkg/table-filter" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/util/mock" + + "github.com/pingcap/dm/dm/config" + "github.com/pingcap/dm/pkg/binlog" + tcontext "github.com/pingcap/dm/pkg/context" + "github.com/pingcap/dm/pkg/log" + "github.com/pingcap/dm/pkg/utils" +) + +// mockExecute mock a kv store. +func mockExecute(kv map[interface{}][]interface{}, dmls []*DML) map[interface{}][]interface{} { + for _, dml := range dmls { + switch dml.op { + case insert: + kv[dml.values[0]] = dml.values + case update: + delete(kv, dml.oldValues[0]) + kv[dml.values[0]] = dml.values + case del: + delete(kv, dml.values[0]) + } + } + + return kv +} + +func randString(n int) string { + letter := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") + b := make([]rune, n) + for i := range b { + b[i] = letter[rand.Intn(len(letter))] + } + return string(b) +} + +func (s *testSyncerSuite) TestCompactJob(c *C) { + compactor := &compactor{ + bufferSize: 10000, + logger: log.L(), + keyMap: make(map[string]map[string]int), + buffer: make([]*job, 0, 10000), + } + + location := binlog.NewLocation("") + ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location} + p := parser.New() + se := mock.NewContext() + targetTableID := "`test`.`tb`" + sourceTable := &filter.Table{Schema: "test", Name: "tb1"} + targetTable := &filter.Table{Schema: "test", Name: "tb"} + schema := "create table test.tb(id int primary key, col1 int, name varchar(24))" + ti, err := createTableInfo(p, se, 0, schema) + c.Assert(err, IsNil) + + var dml *DML + var dmls []*DML + dmlNum := 1000000 + maxID := 1000 + batch := 1000 + updateIdentifyProbability := 0.1 + + // generate DMLs + kv := make(map[interface{}][]interface{}) + for i := 0; i < dmlNum; i++ { + newID := rand.Intn(maxID) + newCol1 := rand.Intn(maxID * 10) + newName := randString(rand.Intn(20)) + values := []interface{}{newID, newCol1, newName} + oldValues, ok := kv[newID] + if !ok { + // insert + dml = newDML(insert, false, targetTableID, sourceTable, nil, values, nil, values, ti.Columns, ti) + } else { + if rand.Int()%2 > 0 { + // update + // check whether to update ID + if rand.Float64() < updateIdentifyProbability { + for try := 0; try < 10; try++ { + newID := rand.Intn(maxID) + if _, ok := kv[newID]; !ok { + values[0] = newID + break + } + } + } + dml = newDML(update, false, targetTableID, sourceTable, oldValues, values, oldValues, values, ti.Columns, ti) + } else { + // delete + dml = newDML(del, false, targetTableID, sourceTable, nil, oldValues, nil, oldValues, ti.Columns, ti) + } + } + + kv = mockExecute(kv, []*DML{dml}) + dmls = append(dmls, dml) + } + + kv = make(map[interface{}][]interface{}) + compactKV := make(map[interface{}][]interface{}) + + // mock compactJob + for i := 0; i < len(dmls); i += batch { + end := i + batch + if end > len(dmls) { + end = len(dmls) + } + kv = mockExecute(kv, dmls[i:end]) + + for _, dml := range dmls[i:end] { + j := newDMLJob(dml.op, sourceTable, targetTable, dml, ec) + if j.dml.op == update && j.dml.updateIdentify() { + delDML, insertDML := updateToDelAndInsert(j.dml) + delJob := j.clone() + delJob.tp = del + delJob.dml = delDML + + insertJob := j.clone() + insertJob.tp = insert + insertJob.dml = insertDML + + compactor.compactJob(delJob) + compactor.compactJob(insertJob) + } else { + compactor.compactJob(j) + } + } + + noCompactNumber := end - i + compactNumber := 0 + for _, dml := range dmls[i:end] { + c.Logf("before compact, dml: %s", dml.String()) + } + for _, j := range compactor.buffer { + if j != nil { + compactKV = mockExecute(compactKV, []*DML{j.dml}) + compactNumber++ + c.Logf("after compact, dml: %s", j.dml.String()) + } + } + c.Logf("before compcat: %d, after compact: %d", noCompactNumber, compactNumber) + c.Assert(compactKV, DeepEquals, kv) + compactor.keyMap = make(map[string]map[string]int) + compactor.buffer = compactor.buffer[0:0] + } +} + +func (s *testSyncerSuite) TestCompactorSafeMode(c *C) { + location := binlog.NewLocation("") + ec := &eventContext{startLocation: &location, currentLocation: &location, lastLocation: &location} + p := parser.New() + se := mock.NewContext() + targetTableID := "`test`.`tb`" + sourceTable := &filter.Table{Schema: "test", Name: "tb1"} + targetTable := &filter.Table{Schema: "test", Name: "tb"} + schema := "create table test.tb(id int primary key, col1 int, name varchar(24))" + ti, err := createTableInfo(p, se, 0, schema) + c.Assert(err, IsNil) + + testCases := []struct { + input []*DML + output []*DML + }{ + // nolint:dupl + { + input: []*DML{ + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti), + newDML(update, true, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti), + newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti), + }, + output: []*DML{ + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti), + newDML(del, true, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti), + newDML(insert, true, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti), + }, + }, + // nolint:dupl + { + input: []*DML{ + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti), + newDML(update, false, targetTableID, sourceTable, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, []interface{}{1, 1, "a"}, []interface{}{3, 3, "c"}, ti.Columns, ti), + newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti), + }, + output: []*DML{ + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{3, 3, "c"}, nil, []interface{}{3, 3, "c"}, ti.Columns, ti), + newDML(del, false, targetTableID, sourceTable, nil, []interface{}{2, 2, "b"}, nil, []interface{}{2, 2, "b"}, ti.Columns, ti), + newDML(insert, false, targetTableID, sourceTable, nil, []interface{}{1, 1, "a"}, nil, []interface{}{1, 1, "a"}, ti.Columns, ti), + }, + }, + } + + inCh := make(chan *job, 100) + syncer := &Syncer{ + tctx: tcontext.NewContext(context.Background(), log.L()), + cfg: &config.SubTaskConfig{ + Name: "task", + SourceID: "source", + SyncerConfig: config.SyncerConfig{ + QueueSize: 100, + WorkerCount: 100, + }, + }, + } + + c.Assert(failpoint.Enable("github.com/pingcap/dm/syncer/SkipFlushCompactor", `return()`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/syncer/SkipFlushCompactor") + + outCh := compactorWrap(inCh, syncer) + + for _, tc := range testCases { + for _, dml := range tc.input { + j := newDMLJob(dml.op, sourceTable, targetTable, dml, ec) + inCh <- j + } + inCh <- newFlushJob() + c.Assert( + utils.WaitSomething(10, time.Millisecond, func() bool { + return len(outCh) == len(tc.output)+1 + }), Equals, true) + for i := 0; i <= len(tc.output); i++ { + j := <-outCh + if i < len(tc.output) { + c.Assert(j.dml, DeepEquals, tc.output[i]) + } else { + c.Assert(j.tp, Equals, flush) + } + } + } +} diff --git a/syncer/dml.go b/syncer/dml.go index 9af48a14b7..332712e2f4 100644 --- a/syncer/dml.go +++ b/syncer/dml.go @@ -442,6 +442,95 @@ func (dml *DML) String() string { return fmt.Sprintf("[safemode: %t, targetTableID: %s, op: %s, columns: %v, oldValues: %v, values: %v]", dml.safeMode, dml.targetTableID, dml.op.String(), dml.columnNames(), dml.originOldValues, dml.originValues) } +// updateToDelAndInsert turns updateDML to delDML and insertDML. +func updateToDelAndInsert(updateDML *DML) (*DML, *DML) { + delDML := &DML{} + *delDML = *updateDML + delDML.op = del + // use oldValues of update as values of delete and reset oldValues + delDML.values = updateDML.oldValues + delDML.originValues = updateDML.originOldValues + delDML.oldValues = nil + delDML.originOldValues = nil + + insertDML := &DML{} + *insertDML = *updateDML + insertDML.op = insert + // reset oldValues + insertDML.oldValues = nil + insertDML.originOldValues = nil + + return delDML, insertDML +} + +// identifyColumns gets columns of unique not null index. +// This is used for compact. +func (dml *DML) identifyColumns() []string { + if defaultIndexColumns := findFitIndex(dml.sourceTableInfo); defaultIndexColumns != nil { + columns := make([]string, 0, len(defaultIndexColumns.Columns)) + for _, column := range defaultIndexColumns.Columns { + columns = append(columns, column.Name.O) + } + return columns + } + return nil +} + +// identifyValues gets values of unique not null index. +// This is used for compact. +func (dml *DML) identifyValues() []interface{} { + if defaultIndexColumns := findFitIndex(dml.sourceTableInfo); defaultIndexColumns != nil { + values := make([]interface{}, 0, len(defaultIndexColumns.Columns)) + for _, column := range defaultIndexColumns.Columns { + values = append(values, dml.values[column.Offset]) + } + return values + } + return nil +} + +// oldIdentifyValues gets old values of unique not null index. +// only for update SQL. +func (dml *DML) oldIdentifyValues() []interface{} { + if defaultIndexColumns := findFitIndex(dml.sourceTableInfo); defaultIndexColumns != nil { + values := make([]interface{}, 0, len(defaultIndexColumns.Columns)) + for _, column := range defaultIndexColumns.Columns { + values = append(values, dml.oldValues[column.Offset]) + } + return values + } + return nil +} + +// identifyKey use identifyValues to gen key. +// This is used for compact. +// PK or (UK + NOT NULL). +func (dml *DML) identifyKey() string { + return genKey(dml.identifyValues()) +} + +// updateIdentify check whether a update sql update its identify values. +func (dml *DML) updateIdentify() bool { + if len(dml.oldValues) == 0 { + return false + } + + values := dml.identifyValues() + oldValues := dml.oldIdentifyValues() + + if len(values) != len(oldValues) { + return true + } + + for i := 0; i < len(values); i++ { + if values[i] != oldValues[i] { + return true + } + } + + return false +} + // identifyKeys gens keys by unique not null value. // This is used for causality. // PK or (UK + NOT NULL) or (UK + NULL + NOT NULL VALUE). @@ -492,6 +581,20 @@ func (dml *DML) whereColumnsAndValues() ([]string, []interface{}) { return columnNames, values } +// genKey gens key by values e.g. "a.1.b". +// This is used for compact. +func genKey(values []interface{}) string { + builder := new(strings.Builder) + for i, v := range values { + if i != 0 { + builder.WriteString(".") + } + fmt.Fprintf(builder, "%v", v) + } + + return builder.String() +} + // genKeyList format keys. func genKeyList(table string, columns []*model.ColumnInfo, dataSeq []interface{}) string { var buf strings.Builder diff --git a/syncer/dml_worker.go b/syncer/dml_worker.go index c9c7c09c45..09beeb4727 100644 --- a/syncer/dml_worker.go +++ b/syncer/dml_worker.go @@ -59,10 +59,14 @@ type DMLWorker struct { // dmlWorkerWrap creates and runs a dmlWorker instance and returns flush job channel. func dmlWorkerWrap(inCh chan *job, syncer *Syncer) chan *job { + chanSize := syncer.cfg.QueueSize / 2 + if syncer.cfg.Compact { + chanSize /= 2 + } dmlWorker := &DMLWorker{ batch: syncer.cfg.Batch, workerCount: syncer.cfg.WorkerCount, - chanSize: syncer.cfg.QueueSize, + chanSize: chanSize, task: syncer.cfg.Name, source: syncer.cfg.SourceID, worker: syncer.cfg.WorkerName, diff --git a/syncer/job.go b/syncer/job.go index ffa89350b5..4d45805691 100644 --- a/syncer/job.go +++ b/syncer/job.go @@ -82,6 +82,12 @@ type job struct { jobAddTime time.Time // job commit time } +func (j *job) clone() *job { + newJob := &job{} + *newJob = *j + return newJob +} + func (j *job) String() string { // only output some important information, maybe useful in execution. var dmlStr string diff --git a/syncer/syncer.go b/syncer/syncer.go index 2def35bb0e..e80ca18b2e 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -268,7 +268,11 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client, notifier func (s *Syncer) newJobChans() { s.closeJobChans() - s.dmlJobCh = make(chan *job, s.cfg.QueueSize) + chanSize := s.cfg.QueueSize * s.cfg.WorkerCount / 2 + if s.cfg.Compact { + chanSize /= 2 + } + s.dmlJobCh = make(chan *job, chanSize) s.ddlJobCh = make(chan *job, s.cfg.QueueSize) s.jobsClosed.Store(false) } @@ -1267,8 +1271,11 @@ func (s *Syncer) fatalFunc(job *job, err error) { func (s *Syncer) syncDML() { defer s.wg.Done() - // TODO: add compactor - causalityCh := causalityWrap(s.dmlJobCh, s) + dmlJobCh := s.dmlJobCh + if s.cfg.Compact { + dmlJobCh = compactorWrap(dmlJobCh, s) + } + causalityCh := causalityWrap(dmlJobCh, s) flushCh := dmlWorkerWrap(causalityCh, s) for range flushCh { diff --git a/tests/dmctl_basic/conf/dm-task.yaml b/tests/dmctl_basic/conf/dm-task.yaml index 7d0bcea48a..02856ca7ff 100644 --- a/tests/dmctl_basic/conf/dm-task.yaml +++ b/tests/dmctl_basic/conf/dm-task.yaml @@ -85,6 +85,7 @@ syncers: worker-count: 16 batch: 100 checkpoint-flush-interval: 1 + compact: true filters: user-filter-1: diff --git a/tests/dmctl_basic/conf/get_task.yaml b/tests/dmctl_basic/conf/get_task.yaml index 4fb074c4af..16ec181c97 100644 --- a/tests/dmctl_basic/conf/get_task.yaml +++ b/tests/dmctl_basic/conf/get_task.yaml @@ -149,6 +149,7 @@ syncers: batch: 100 queue-size: 1024 checkpoint-flush-interval: 1 + compact: true max-retry: 0 auto-fix-gtid: false enable-gtid: false diff --git a/tests/import_v10x/conf/task.yaml b/tests/import_v10x/conf/task.yaml index 033b472209..3b84093bf8 100644 --- a/tests/import_v10x/conf/task.yaml +++ b/tests/import_v10x/conf/task.yaml @@ -93,6 +93,7 @@ syncers: batch: 100 queue-size: 1024 checkpoint-flush-interval: 30 + compact: false max-retry: 0 auto-fix-gtid: false enable-gtid: false @@ -105,6 +106,7 @@ syncers: batch: 100 queue-size: 1024 checkpoint-flush-interval: 30 + compact: false max-retry: 0 auto-fix-gtid: false enable-gtid: true diff --git a/tests/shardddl1/conf/single-source-no-sharding.yaml b/tests/shardddl1/conf/single-source-no-sharding.yaml index 90987e1399..9be4424860 100644 --- a/tests/shardddl1/conf/single-source-no-sharding.yaml +++ b/tests/shardddl1/conf/single-source-no-sharding.yaml @@ -46,5 +46,7 @@ loaders: syncers: global: - worker-count: 16 - batch: 100 + worker-count: 10 + batch: 10 + queue-size: 100 + compact: true diff --git a/tests/shardddl1/run.sh b/tests/shardddl1/run.sh index f0c370b387..eaed917367 100644 --- a/tests/shardddl1/run.sh +++ b/tests/shardddl1/run.sh @@ -573,6 +573,50 @@ function DM_COLUMN_INDEX() { "clean_table" "optimistic" } +function DM_COMPACT_CASE() { + END=100 + for i in $(seq 1 $END); do + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b) values($i,$i)" + run_sql_source1 "update ${shardddl1}.${tb1} set c=1 where a=$i" + run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$i" + run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$i" + run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$i" + run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i + 100))" + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b) values($i,$i)" + done + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 + compactCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "finish to compact" | wc -l) + if [[ "$compactCnt" -le 100 ]]; then + echo "compact $compactCnt dmls which is less than 100" + exit 1 + fi +} + +function DM_COMPACT() { + # mock downstream has a high latency and upstream has a high workload + ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + export GO_FAILPOINTS='github.com/pingcap/dm/syncer/BlockExecuteSQLs=return(1)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_case COMPACT "single-source-no-sharding" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique, c int);\"" \ + "clean_table" "" + + ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + export GO_FAILPOINTS='' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT +} + function DM_CAUSALITY_CASE() { run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,2)" run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,3)" @@ -594,6 +638,7 @@ function run() { init_cluster init_database + DM_COMPACT DM_CAUSALITY DM_UpdateBARule DM_RENAME_TABLE