diff --git a/loader/checkpoint.go b/loader/checkpoint.go index 41a42bbdc3..d56764e58b 100644 --- a/loader/checkpoint.go +++ b/loader/checkpoint.go @@ -43,6 +43,9 @@ type CheckPoint interface { // GetAllRestoringFileInfo return all restoring files position GetAllRestoringFileInfo() map[string][]int64 + // IsTableCreated checks if db / table was created. set `table` to "" when check db + IsTableCreated(db, table string) bool + // IsTableFinished query if table has finished IsTableFinished(db, table string) bool @@ -66,6 +69,13 @@ type CheckPoint interface { // GenSQL generates sql to update checkpoint to DB GenSQL(filename string, offset int64) string + + // UpdateOffset keeps `cp.restoringFiles` in memory same with checkpoint in DB, + // should be called after update checkpoint in DB + UpdateOffset(filename string, offset int64) + + // AllFinished returns `true` when all restoring job are finished + AllFinished() bool } // RemoteCheckPoint implements CheckPoint by saving status in remote database system, mostly in TiDB. @@ -80,7 +90,10 @@ type RemoteCheckPoint struct { id string schema string tableName string // tableName contains schema name - restoringFiles map[string]map[string]FilePosSet + restoringFiles struct { + sync.RWMutex + pos map[string]map[string]FilePosSet // schema -> table -> FilePosSet(filename -> [cur, end]) + } finishedTables map[string]struct{} logCtx *tcontext.Context } @@ -95,12 +108,12 @@ func newRemoteCheckPoint(tctx *tcontext.Context, cfg *config.SubTaskConfig, id s db: db, conn: dbConns[0], id: id, - restoringFiles: make(map[string]map[string]FilePosSet), finishedTables: make(map[string]struct{}), schema: dbutil.ColumnName(cfg.MetaSchema), tableName: dbutil.TableName(cfg.MetaSchema, fmt.Sprintf("%s_loader_checkpoint", cfg.Name)), logCtx: tcontext.Background().WithLogger(tctx.L().WithFields(zap.String("component", "remote checkpoint"))), } + cp.restoringFiles.pos = make(map[string]map[string]FilePosSet) err = cp.prepare(tctx) if err != nil { @@ -174,17 +187,19 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { endPos int64 ) - cp.restoringFiles = make(map[string]map[string]FilePosSet) // reset to empty + cp.restoringFiles.Lock() + defer cp.restoringFiles.Unlock() + cp.restoringFiles.pos = make(map[string]map[string]FilePosSet) // reset to empty for rows.Next() { err := rows.Scan(&filename, &schema, &table, &offset, &endPos) if err != nil { return terror.WithScope(terror.DBErrorAdapt(err, terror.ErrDBDriverError), terror.ScopeDownstream) } - if _, ok := cp.restoringFiles[schema]; !ok { - cp.restoringFiles[schema] = make(map[string]FilePosSet) + if _, ok := cp.restoringFiles.pos[schema]; !ok { + cp.restoringFiles.pos[schema] = make(map[string]FilePosSet) } - tables := cp.restoringFiles[schema] + tables := cp.restoringFiles.pos[schema] if _, ok := tables[table]; !ok { tables[table] = make(map[string][]int64) } @@ -197,27 +212,53 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error { // GetRestoringFileInfo implements CheckPoint.GetRestoringFileInfo func (cp *RemoteCheckPoint) GetRestoringFileInfo(db, table string) map[string][]int64 { - if tables, ok := cp.restoringFiles[db]; ok { + cp.restoringFiles.RLock() + defer cp.restoringFiles.RUnlock() + results := make(map[string][]int64) + if tables, ok := cp.restoringFiles.pos[db]; ok { if restoringFiles, ok := tables[table]; ok { - return restoringFiles + // make a copy of restoringFiles, and its slice value + for k, v := range restoringFiles { + results[k] = make([]int64, len(v)) + copy(results[k], v) + } + return results } } - return make(map[string][]int64) + return results } // GetAllRestoringFileInfo implements CheckPoint.GetAllRestoringFileInfo func (cp *RemoteCheckPoint) GetAllRestoringFileInfo() map[string][]int64 { + cp.restoringFiles.RLock() + defer cp.restoringFiles.RUnlock() results := make(map[string][]int64) - for _, tables := range cp.restoringFiles { + for _, tables := range cp.restoringFiles.pos { for _, files := range tables { for file, pos := range files { - results[file] = pos + results[file] = make([]int64, len(pos)) + copy(results[file], pos) } } } return results } +// IsTableCreated implements CheckPoint.IsTableCreated +func (cp *RemoteCheckPoint) IsTableCreated(db, table string) bool { + cp.restoringFiles.RLock() + defer cp.restoringFiles.RUnlock() + tables, ok := cp.restoringFiles.pos[db] + if !ok { + return false + } + if table == "" { + return true + } + _, ok = tables[table] + return ok +} + // IsTableFinished implements CheckPoint.IsTableFinished func (cp *RemoteCheckPoint) IsTableFinished(db, table string) bool { key := strings.Join([]string{db, table}, ".") @@ -229,8 +270,10 @@ func (cp *RemoteCheckPoint) IsTableFinished(db, table string) bool { // CalcProgress implements CheckPoint.CalcProgress func (cp *RemoteCheckPoint) CalcProgress(allFiles map[string]Tables2DataFiles) error { + cp.restoringFiles.RLock() + defer cp.restoringFiles.RUnlock() cp.finishedTables = make(map[string]struct{}) // reset to empty - for db, tables := range cp.restoringFiles { + for db, tables := range cp.restoringFiles.pos { dbTables, ok := allFiles[db] if !ok { return terror.ErrCheckpointDBNotExistInFile.Generate(db) @@ -257,7 +300,7 @@ func (cp *RemoteCheckPoint) CalcProgress(allFiles map[string]Tables2DataFiles) e } } - cp.logCtx.L().Info("calculate checkpoint finished.", zap.Reflect("finished tables", cp.finishedTables)) + cp.logCtx.L().Info("calculate checkpoint finished.", zap.Any("finished tables", cp.finishedTables)) return nil } @@ -274,20 +317,27 @@ func (cp *RemoteCheckPoint) allFilesFinished(files map[string][]int64) bool { return true } +// AllFinished implements CheckPoint.AllFinished +func (cp *RemoteCheckPoint) AllFinished() bool { + cp.restoringFiles.RLock() + defer cp.restoringFiles.RUnlock() + for _, tables := range cp.restoringFiles.pos { + for _, restoringFiles := range tables { + if !cp.allFilesFinished(restoringFiles) { + return false + } + } + } + return true +} + // Init implements CheckPoint.Init func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos int64) error { - idx := strings.LastIndex(filename, ".sql") - if idx < 0 { - return terror.ErrCheckpointInvalidTableFile.Generate(filename) - } - fname := filename[:idx] - fields := strings.Split(fname, ".") - if len(fields) != 2 && len(fields) != 3 { + // fields[0] -> db name, fields[1] -> table name + schema, table, err := getDBAndTableFromFilename(filename) + if err != nil { return terror.ErrCheckpointInvalidTableFile.Generate(filename) } - - // fields[0] -> db name, fields[1] -> table name - schema, table := fields[0], fields[1] sql2 := fmt.Sprintf("INSERT INTO %s (`id`, `filename`, `cp_schema`, `cp_table`, `offset`, `end_pos`) VALUES(?,?,?,?,?,?)", cp.tableName) cp.logCtx.L().Info("initial checkpoint record", zap.String("sql", sql2), @@ -299,20 +349,22 @@ func (cp *RemoteCheckPoint) Init(tctx *tcontext.Context, filename string, endPos zap.Int64("end position", endPos)) args := []interface{}{cp.id, filename, schema, table, 0, endPos} cp.connMutex.Lock() - err := cp.conn.executeSQL(tctx, []string{sql2}, args) + err = cp.conn.executeSQL(tctx, []string{sql2}, args) cp.connMutex.Unlock() if err != nil { if isErrDupEntry(err) { - cp.logCtx.L().Info("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename)) + cp.logCtx.L().Error("checkpoint record already exists, skip it.", zap.String("id", cp.id), zap.String("filename", filename)) return nil } return terror.WithScope(terror.Annotate(err, "initialize checkpoint"), terror.ScopeDownstream) } // checkpoint not exists and no error, cache endPos in memory - if _, ok := cp.restoringFiles[schema]; !ok { - cp.restoringFiles[schema] = make(map[string]FilePosSet) + cp.restoringFiles.Lock() + defer cp.restoringFiles.Unlock() + if _, ok := cp.restoringFiles.pos[schema]; !ok { + cp.restoringFiles.pos[schema] = make(map[string]FilePosSet) } - tables := cp.restoringFiles[schema] + tables := cp.restoringFiles.pos[schema] if _, ok := tables[table]; !ok { tables[table] = make(map[string][]int64) } @@ -345,6 +397,18 @@ func (cp *RemoteCheckPoint) GenSQL(filename string, offset int64) string { return sql } +// UpdateOffset implements CheckPoint.UpdateOffset +func (cp *RemoteCheckPoint) UpdateOffset(filename string, offset int64) { + cp.restoringFiles.Lock() + defer cp.restoringFiles.Unlock() + db, table, err := getDBAndTableFromFilename(filename) + if err != nil { + cp.logCtx.L().Error("error in checkpoint UpdateOffset", zap.Error(err)) + return + } + cp.restoringFiles.pos[db][table][filename][0] = offset +} + // Clear implements CheckPoint.Clear func (cp *RemoteCheckPoint) Clear(tctx *tcontext.Context) error { sql2 := fmt.Sprintf("DELETE FROM %s WHERE `id` = '%s'", cp.tableName, cp.id) @@ -379,17 +443,10 @@ func (cp *RemoteCheckPoint) Count(tctx *tcontext.Context) (int, error) { } func (cp *RemoteCheckPoint) String() string { - // `String` is often used to log something, it's not a big problem even fail, - // so 1min should be enough. - tctx2, cancel := cp.logCtx.WithTimeout(time.Minute) - defer cancel() - - if err := cp.Load(tctx2); err != nil { - return err.Error() - } - + cp.restoringFiles.RLock() + defer cp.restoringFiles.RUnlock() result := make(map[string][]int64) - for _, tables := range cp.restoringFiles { + for _, tables := range cp.restoringFiles.pos { for _, files := range tables { for file, set := range files { result[file] = set diff --git a/loader/checkpoint_test.go b/loader/checkpoint_test.go index 46bd040228..797c4e067e 100644 --- a/loader/checkpoint_test.go +++ b/loader/checkpoint_test.go @@ -70,6 +70,14 @@ func (t *testCheckPointSuite) TestForDB(c *C) { {"db1.tbl3.sql", 789}, } + allFiles := map[string]Tables2DataFiles{ + "db1": { + "tbl1": {cases[0].filename}, + "tbl2": {cases[1].filename}, + "tbl3": {cases[2].filename}, + }, + } + id := "test_for_db" tctx := tcontext.Background() cp, err := newRemoteCheckPoint(tctx, t.cfg, id) @@ -89,12 +97,26 @@ func (t *testCheckPointSuite) TestForDB(c *C) { c.Assert(err, IsNil) c.Assert(count, Equals, 0) + c.Assert(cp.IsTableCreated("db1", ""), IsFalse) + c.Assert(cp.IsTableCreated("db1", "tbl1"), IsFalse) + c.Assert(cp.CalcProgress(allFiles), IsNil) + c.Assert(cp.IsTableFinished("db1", "tbl1"), IsFalse) + // insert default checkpoints for _, cs := range cases { err = cp.Init(tctx, cs.filename, cs.endPos) c.Assert(err, IsNil) } + c.Assert(cp.IsTableCreated("db1", ""), IsTrue) + c.Assert(cp.IsTableCreated("db1", "tbl1"), IsTrue) + c.Assert(cp.CalcProgress(allFiles), IsNil) + c.Assert(cp.IsTableFinished("db1", "tbl1"), IsFalse) + + info := cp.GetRestoringFileInfo("db1", "tbl1") + c.Assert(info, HasLen, 1) + c.Assert(info[cases[0].filename], DeepEquals, []int64{0, cases[0].endPos}) + err = cp.Load(tctx) c.Assert(err, IsNil) @@ -129,6 +151,15 @@ func (t *testCheckPointSuite) TestForDB(c *C) { err = cp.Load(tctx) c.Assert(err, IsNil) + c.Assert(cp.IsTableCreated("db1", ""), IsTrue) + c.Assert(cp.IsTableCreated("db1", "tbl1"), IsTrue) + c.Assert(cp.CalcProgress(allFiles), IsNil) + c.Assert(cp.IsTableFinished("db1", "tbl1"), IsTrue) + + info = cp.GetRestoringFileInfo("db1", "tbl1") + c.Assert(info, HasLen, 1) + c.Assert(info[cases[0].filename], DeepEquals, []int64{cases[0].endPos, cases[0].endPos}) + infos = cp.GetAllRestoringFileInfo() c.Assert(len(infos), Equals, len(cases)) for _, cs := range cases { @@ -150,6 +181,11 @@ func (t *testCheckPointSuite) TestForDB(c *C) { err = cp.Load(tctx) c.Assert(err, IsNil) + c.Assert(cp.IsTableCreated("db1", ""), IsFalse) + c.Assert(cp.IsTableCreated("db1", "tbl1"), IsFalse) + c.Assert(cp.CalcProgress(allFiles), IsNil) + c.Assert(cp.IsTableFinished("db1", "tbl1"), IsFalse) + infos = cp.GetAllRestoringFileInfo() c.Assert(len(infos), Equals, 0) @@ -158,3 +194,21 @@ func (t *testCheckPointSuite) TestForDB(c *C) { c.Assert(err, IsNil) c.Assert(count, Equals, 0) } + +func (t *testCheckPointSuite) TestDeepCopy(c *C) { + cp := RemoteCheckPoint{} + cp.restoringFiles.pos = make(map[string]map[string]FilePosSet) + cp.restoringFiles.pos["db"] = make(map[string]FilePosSet) + cp.restoringFiles.pos["db"]["table"] = make(map[string][]int64) + cp.restoringFiles.pos["db"]["table"]["file"] = []int64{0, 100} + + ret := cp.GetRestoringFileInfo("db", "table") + cp.restoringFiles.pos["db"]["table"]["file"][0] = 10 + cp.restoringFiles.pos["db"]["table"]["file2"] = []int64{0, 100} + c.Assert(ret, DeepEquals, map[string][]int64{"file": {0, 100}}) + + ret = cp.GetAllRestoringFileInfo() + cp.restoringFiles.pos["db"]["table"]["file"][0] = 20 + cp.restoringFiles.pos["db"]["table"]["file3"] = []int64{0, 100} + c.Assert(ret, DeepEquals, map[string][]int64{"file": {10, 100}, "file2": {0, 100}}) +} diff --git a/loader/loader.go b/loader/loader.go index 16d97e0587..6510d55d49 100644 --- a/loader/loader.go +++ b/loader/loader.go @@ -45,7 +45,8 @@ import ( ) const ( - jobCount = 1000 + jobCount = 1000 + uninitializedOffset = -1 ) // FilePosSet represents a set in mathematics. @@ -181,21 +182,8 @@ func (w *Worker) run(ctx context.Context, fileJobQueue chan *fileJob, runFatalCh } return } + w.loader.checkPoint.UpdateOffset(job.file, job.offset) w.loader.finishedDataSize.Add(job.offset - job.lastOffset) - - if w.cfg.CleanDumpFile { - fileInfos := w.checkPoint.GetRestoringFileInfo(job.schema, job.table) - if pos, ok := fileInfos[job.file]; ok { - if job.offset == pos[1] { - w.tctx.L().Info("try to remove loaded dump file", zap.String("data file", job.file)) - if err := os.Remove(job.absPath); err != nil { - w.tctx.L().Warn("error when remove loaded dump file", zap.String("data file", job.file), zap.Error(err)) - } - } - } else { - w.tctx.L().Warn("file not recorded in checkpoint", zap.String("data file", job.file)) - } - } } } } @@ -258,22 +246,28 @@ func (w *Worker) dispatchSQL(ctx context.Context, file string, offset int64, tab cur int64 ) + baseFile := filepath.Base(file) + f, err = os.Open(file) if err != nil { return terror.ErrLoadUnitDispatchSQLFromFile.Delegate(err) } defer f.Close() - finfo, err := f.Stat() - if err != nil { - return terror.ErrLoadUnitDispatchSQLFromFile.Delegate(err) - } + // file was not found in checkpoint + if offset == uninitializedOffset { + offset = 0 - baseFile := filepath.Base(file) - err = w.checkPoint.Init(w.tctx.WithContext(ctx), baseFile, finfo.Size()) - if err != nil { - w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err)) - return err + finfo, err2 := f.Stat() + if err2 != nil { + return terror.ErrLoadUnitDispatchSQLFromFile.Delegate(err2) + } + + err2 = w.checkPoint.Init(w.tctx.WithContext(ctx), baseFile, finfo.Size()) + if err2 != nil { + w.tctx.L().Error("fail to initial checkpoint", zap.String("data file", file), zap.Int64("offset", offset), log.ShortError(err2)) + return err2 + } } cur, err = f.Seek(offset, io.SeekStart) @@ -617,25 +611,6 @@ func (l *Loader) Restore(ctx context.Context) error { if err == nil { l.logCtx.L().Info("all data files have been finished", zap.Duration("cost time", time.Since(begin))) - if l.cfg.CleanDumpFile { - files := CollectDirFiles(l.cfg.Dir) - hasDatafile := false - for file := range files { - if strings.HasSuffix(file, ".sql") && - !strings.HasSuffix(file, "-schema.sql") && - !strings.HasSuffix(file, "-schema-create.sql") && - !strings.Contains(file, "-schema-view.sql") && - !strings.Contains(file, "-schema-triggers.sql") && - !strings.Contains(file, "-schema-post.sql") { - hasDatafile = true - } - } - - if !hasDatafile { - l.logCtx.L().Info("clean dump files after importing all files") - l.cleanDumpFiles() - } - } } else if errors.Cause(err) != context.Canceled { return err } @@ -664,6 +639,9 @@ func (l *Loader) Close() { if err != nil { l.logCtx.L().Error("close downstream DB error", log.ShortError(err)) } + if l.cfg.CleanDumpFile && l.checkPoint.AllFinished() { + l.cleanDumpFiles() + } l.checkPoint.Close() l.removeLabelValuesWithTaskInMetrics(l.cfg.Name) l.closed.Set(true) @@ -913,15 +891,11 @@ func (l *Loader) prepareDataFiles(files map[string]struct{}) error { continue } - idx := strings.LastIndex(file, ".sql") - name := file[:idx] - fields := strings.Split(name, ".") - if len(fields) != 2 && len(fields) != 3 { - l.logCtx.L().Warn("invalid db table sql file", zap.String("file", file)) + db, table, err := getDBAndTableFromFilename(file) + if err != nil { + l.logCtx.L().Warn("invalid db table sql file", zap.String("file", file), zap.Error(err)) continue } - - db, table := fields[0], fields[1] if l.skipSchemaAndTable(&filter.Table{Schema: db, Name: table}) { l.logCtx.L().Warn("ignore data file", zap.String("data file", file)) continue @@ -1004,6 +978,10 @@ func (l *Loader) prepare() error { // restoreSchema creates schema func (l *Loader) restoreSchema(ctx context.Context, conn *DBConn, sqlFile, schema string) error { + if l.checkPoint.IsTableCreated(schema, "") { + l.logCtx.L().Info("database already exists in checkpoint, skip creating it", zap.String("schema", schema), zap.String("db schema file", sqlFile)) + return nil + } err := l.restoreStructure(ctx, conn, sqlFile, schema, "") if err != nil { if isErrDBExists(err) { @@ -1017,6 +995,10 @@ func (l *Loader) restoreSchema(ctx context.Context, conn *DBConn, sqlFile, schem // restoreTable creates table func (l *Loader) restoreTable(ctx context.Context, conn *DBConn, sqlFile, schema, table string) error { + if l.checkPoint.IsTableCreated(schema, table) { + l.logCtx.L().Info("table already exists in checkpoint, skip creating it", zap.String("schema", schema), zap.String("table", table), zap.String("db schema file", sqlFile)) + return nil + } err := l.restoreStructure(ctx, conn, sqlFile, schema, table) if err != nil { if isErrTableExists(err) { @@ -1077,7 +1059,6 @@ func (l *Loader) restoreStructure(ctx context.Context, conn *DBConn, sqlFile str return terror.WithScope(err, terror.ScopeDownstream) } } - } return nil @@ -1195,7 +1176,7 @@ func (l *Loader) restoreData(ctx context.Context) error { l.logCtx.L().Debug("dispatch data file", zap.String("schema", db), zap.String("table", table), zap.String("data file", file)) - var offset int64 + offset := int64(uninitializedOffset) posSet, ok := restoringFiles[file] if ok { offset = posSet[0] @@ -1255,24 +1236,23 @@ func (l *Loader) getMydumpMetadata() error { // cleanDumpFiles is called when finish restoring data, to clean useless files func (l *Loader) cleanDumpFiles() { + l.logCtx.L().Info("clean dump files") if l.cfg.Mode == config.ModeFull { // in full-mode all files won't be need in the future if err := os.RemoveAll(l.cfg.Dir); err != nil { l.logCtx.L().Warn("error when remove loaded dump folder", zap.String("data folder", l.cfg.Dir), zap.Error(err)) } } else { - // leave metadata file, only delete structure files - for db, tables := range l.db2Tables { - dbFile := fmt.Sprintf("%s/%s-schema-create.sql", l.cfg.Dir, db) - if err := os.Remove(dbFile); err != nil { - l.logCtx.L().Warn("error when remove loaded dump file", zap.String("data file", dbFile), zap.Error(err)) - } - for table := range tables { - tableFile := fmt.Sprintf("%s/%s.%s-schema.sql", l.cfg.Dir, db, table) - if err := os.Remove(tableFile); err != nil { - l.logCtx.L().Warn("error when remove loaded dump file", zap.String("data file", tableFile), zap.Error(err)) - } + // leave metadata file, only delete sql files + files := CollectDirFiles(l.cfg.Dir) + var lastErr error + for f := range files { + if strings.HasSuffix(f, ".sql") { + lastErr = os.Remove(filepath.Join(l.cfg.Dir, f)) } } + if lastErr != nil { + l.logCtx.L().Warn("show last error when remove loaded dump sql files", zap.String("data folder", l.cfg.Dir), zap.Error(lastErr)) + } } } diff --git a/loader/util.go b/loader/util.go index 21f1d4a1be..6d8d80dcc8 100644 --- a/loader/util.go +++ b/loader/util.go @@ -91,3 +91,16 @@ func generateSchemaCreateFile(dir string, schema string) error { func escapeName(name string) string { return strings.Replace(name, "`", "``", -1) } + +// input filename is like `all_mode.t1.0.sql` or `all_mode.t1.sql` +func getDBAndTableFromFilename(filename string) (string, string, error) { + idx := strings.LastIndex(filename, ".sql") + if idx < 0 { + return "", "", fmt.Errorf("%s doesn't have a `.sql` suffix", filename) + } + fields := strings.Split(filename[:idx], ".") + if len(fields) != 2 && len(fields) != 3 { + return "", "", fmt.Errorf("%s doesn't have correct `.` seperator", filename) + } + return fields[0], fields[1], nil +} diff --git a/pkg/utils/util.go b/pkg/utils/util.go index 408d93227c..2b8d13bc2d 100644 --- a/pkg/utils/util.go +++ b/pkg/utils/util.go @@ -77,6 +77,11 @@ var ( "^ALTER\\s+TABLESPACE", "^DROP\\s+TABLESPACE", + // event + "^CREATE\\s+(DEFINER\\s?=.+?)?EVENT", + "^ALTER\\s+(DEFINER\\s?=.+?)?EVENT", + "^DROP\\s+EVENT", + // account management "^GRANT", "^REVOKE", diff --git a/syncer/filter_test.go b/syncer/filter_test.go index 1a6bc3b1b7..5dce9f42cc 100644 --- a/syncer/filter_test.go +++ b/syncer/filter_test.go @@ -95,6 +95,11 @@ END`, true}, {"ALTER TABLESPACE `ts1` DROP DATAFILE 'ts1.idb' ENGIEN=NDB", true}, {"DROP TABLESPACE ts1", true}, + // event + {"CREATE DEFINER=CURRENT_USER EVENT myevent ON SCHEDULE AT CURRENT_TIMESTAMP + INTERVAL 1 HOUR DO UPDATE myschema.mytable SET mycol = mycol + 1;", true}, + {"ALTER DEFINER = CURRENT_USER EVENT myevent ON SCHEDULE EVERY 12 HOUR STARTS CURRENT_TIMESTAMP + INTERVAL 4 HOUR;", true}, + {"DROP EVENT myevent;", true}, + // account management {"CREATE USER 't'@'%' IDENTIFIED WITH 'mysql_native_password' AS '*93E34F4B81FEC9E8271655EA87646ED01AF377CC'", true}, {"ALTER USER 't'@'%' IDENTIFIED WITH 'mysql_native_password' AS '*1114744159A0EF13B12FC371C94877763F9512D0'", true}, diff --git a/tests/all_mode/run.sh b/tests/all_mode/run.sh index 2fd47e00dc..8827314873 100755 --- a/tests/all_mode/run.sh +++ b/tests/all_mode/run.sh @@ -122,8 +122,8 @@ function run() { check_contains "all_mode" echo "check dump files have been cleaned" - ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files" - ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files" + ls $WORK_DIR/worker1/dumped_data.$ILLEGAL_CHAR_NAME && exit 1 || echo "worker1 auto removed dump files" + ls $WORK_DIR/worker2/dumped_data.$ILLEGAL_CHAR_NAME && exit 1 || echo "worker2 auto removed dump files" export GO_FAILPOINTS='' diff --git a/tests/import_goroutine_leak/run.sh b/tests/import_goroutine_leak/run.sh index 601d428c59..7464b1d1b4 100644 --- a/tests/import_goroutine_leak/run.sh +++ b/tests/import_goroutine_leak/run.sh @@ -26,7 +26,6 @@ function run() { run_sql_file $WORK_DIR/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $WORK_DIR/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - echo "dm-worker panic, doJob of import unit workers don't exit" # check doJobs of import unit worker exit inject_points=("github.com/pingcap/dm/loader/dispatchError=return(1)" @@ -60,6 +59,7 @@ function run() { inject_points=("github.com/pingcap/dm/loader/dontWaitWorkerExit=return(1)" "github.com/pingcap/dm/loader/LoadDataSlowDown=sleep(1000)" "github.com/pingcap/dm/loader/executeSQLError=return(1)" + "github.com/pingcap/dm/loader/dispatchError=return(1)" ) export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})" run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml