From e43a231f98d61f8410af1db112b1a671f28ee958 Mon Sep 17 00:00:00 2001 From: ffffwh Date: Tue, 10 Jan 2023 19:19:07 +0800 Subject: [PATCH] api: save dump progress on consul extractor: send dump entry to applier anyway for row numbers --- api/handler/v2/job.go | 2 +- driver/common/store.go | 37 ++++++++++++++--- driver/mysql/applier.go | 21 ++++++++++ driver/mysql/extractor.go | 85 +++++++++++++++++---------------------- 4 files changed, 91 insertions(+), 54 deletions(-) diff --git a/api/handler/v2/job.go b/api/handler/v2/job.go index 68bccd2e6..73e360d42 100644 --- a/api/handler/v2/job.go +++ b/api/handler/v2/job.go @@ -754,7 +754,7 @@ func getTaskProgress(logger g.LoggerType, header http.Header, taskLogs []models. return nil, 0, "" } - execRowCount, totalRowCount, err := storeManager.GetFullProgress(jobId) + execRowCount, totalRowCount, err := storeManager.GetDumpProgress(jobId) progress := &models.DumpProgress{ ExecRowCount: execRowCount, TotalRowCount: totalRowCount, diff --git a/driver/common/store.go b/driver/common/store.go index 5bfe65a14..a1931dd21 100644 --- a/driver/common/store.go +++ b/driver/common/store.go @@ -621,12 +621,6 @@ func (sm *StoreManager) WatchTree(dir string, stopCh <-chan struct{}) (<-chan [] return sm.consulStore.WatchTree(dir, stopCh) } -// return: ExecRowCount, TotalRowCount -func (sm *StoreManager) GetFullProgress(jobName string) (int64, int64, error) { - // TODO - return 42, 42, nil -} - func (sm *StoreManager) PutJobStage(jobName string, stage string) error { key := fmt.Sprintf("dtle/%v/JobStage", jobName) return sm.consulStore.Put(key, []byte(stage), nil) @@ -644,6 +638,37 @@ func (sm *StoreManager) GetJobStage(jobName string) (string, error) { return string(kv.Value), nil } +func (sm *StoreManager) PutDumpProgress(jobName string, exec int64, total int64) error { + key := fmt.Sprintf("dtle/%v/DumpProgress", jobName) + bs, err := json.Marshal([]int64{exec, total}) + if err != nil { + return err + } + return sm.consulStore.Put(key, bs, nil) +} + +// return: ExecRowCount, TotalRowCount +func (sm *StoreManager) GetDumpProgress(jobName string) (int64, int64, error) { + key := fmt.Sprintf("dtle/%v/DumpProgress", jobName) + kv, err:= sm.consulStore.Get(key) + if err == store.ErrKeyNotFound { + return 0, 0, nil + } else if err != nil { + return 0, 0, err + } + + var r []int64 + err = json.Unmarshal(kv.Value, &r) + if err != nil { + return 0, 0, err + } + if len(r) != 2 { + return 0, 0, fmt.Errorf("unexpected len for %v. found %v", key, len(r)) + } + + return r[0], r[1], nil +} + // consul store item func NewDefaultRole(tenant string) *Role { diff --git a/driver/mysql/applier.go b/driver/mysql/applier.go index af03fa407..a3eb94c4b 100644 --- a/driver/mysql/applier.go +++ b/driver/mysql/applier.go @@ -377,6 +377,7 @@ func (a *Applier) Run() { } a.ai.OnError = a.onError + go a.updateDumpProgressLoop() if sourceType == "mysql" { go a.updateGtidLoop() } @@ -1187,3 +1188,23 @@ func (a *Applier) enableForeignKeyChecks() error { } return nil } + +func (a *Applier) updateDumpProgressLoop() { + var err error + interval := 10 + a.logger.Debug("updateDumpProgressLoop", "interval", interval) + + for { + if a.shutdown { + return + } + + err = a.storeManager.PutDumpProgress(a.subject, a.TotalRowsReplayed, a.mysqlContext.RowsEstimate) + if err != nil { + a.onError(common.TaskStateDead, err) + return + } + + time.Sleep(time.Duration(interval) * time.Second) + } +} diff --git a/driver/mysql/extractor.go b/driver/mysql/extractor.go index 28938ca6a..f1865b114 100644 --- a/driver/mysql/extractor.go +++ b/driver/mysql/extractor.go @@ -1303,51 +1303,48 @@ func (e *Extractor) mysqlDump() (retErr error) { e.gotCoordinateCh <- struct{}{} - // Transform the current schema so that it reflects the *current* state of the MySQL server's contents. - // First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ... - if !e.mysqlContext.SkipCreateDbTable { - e.logger.Info("generating DROP and CREATE statements to reflect current database schemas", - "replicateDoDb", e.replicateDoDb) - - for _, db := range e.replicateDoDb { - var dbSQL string - if strings.ToLower(db.TableSchema) != "mysql" { - if db.TableSchemaRename != "" { - dbSQL, err = base.RenameCreateSchemaAddINE(db.CreateSchemaString, db.TableSchemaRename) - if err != nil { - return errors.Wrap(err, "RenameCreateSchemaAddINE") - } - } else { - dbSQL = db.CreateSchemaString + // Go through all tables to get DDL and row numbers. + for _, db := range e.replicateDoDb { + if strings.ToLower(db.TableSchema) == "mysql" { + continue + } + + // Create the schema. + entry := &common.DumpEntry{} + if !e.mysqlContext.SkipCreateDbTable { + if db.TableSchemaRename != "" { + entry.DbSQL, err = base.RenameCreateSchemaAddINE(db.CreateSchemaString, db.TableSchemaRename) + if err != nil { + return errors.Wrap(err, "RenameCreateSchemaAddINE") } + } else { + entry.DbSQL = db.CreateSchemaString + } + } + if err := e.encodeAndSendDumpEntry(entry); err != nil { + return errors.Wrap(err, "encodeAndSendDumpEntry. create schema entry") + } + // Create the tables. + for _, tbCtx := range db.TableMap { + tb := tbCtx.Table + tb.Counter, err = e.CountTableRows(tb) + if err != nil { + return errors.Wrapf(err, "CountTableRows %v.%v", tb.TableSchema, tb.TableName) } + e.logger.Info("count table", "schema", db.TableSchema, "table", tb.TableName, "rows", tb.Counter) + entry := &common.DumpEntry{ - DbSQL: dbSQL, - } - atomic.AddInt64(&e.mysqlContext.RowsEstimate, 1) - atomic.AddInt64(&e.TotalRowsCopied, 1) - if err := e.encodeAndSendDumpEntry(entry); err != nil { - return errors.Wrap(err, "encodeAndSendDumpEntry. create schema entry") + TbSQL: []string{}, + TotalCount: tb.Counter, } - - for _, tbCtx := range db.TableMap { - tb := tbCtx.Table - if tb.TableSchema != db.TableSchema { - continue - } - total, err := e.CountTableRows(tb) - if err != nil { - return errors.Wrapf(err, "CountTableRows %v.%v", tb.TableSchema, tb.TableName) - } - tb.Counter = total - var tbSQL []string + if !e.mysqlContext.SkipCreateDbTable { if strings.ToLower(tb.TableType) == "view" { /*tbSQL, err = base.ShowCreateView(e.singletonDB, tb.TableSchema, tb.TableName, e.mysqlContext.DropTableIfExists) if err != nil { return err }*/ - } else if strings.ToLower(tb.TableSchema) != "mysql" { + } else { ctStmt, err := base.ShowCreateTable(e.singletonDB, tb.TableSchema, tb.TableName) if err != nil { return err @@ -1362,23 +1359,17 @@ func (e *Extractor) mysqlDump() (retErr error) { } if e.mysqlContext.DropTableIfExists { - tbSQL = append(tbSQL, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", + entry.TbSQL = append(entry.TbSQL, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", mysqlconfig.EscapeName(targetSchema), mysqlconfig.EscapeName(targetTable))) } - tbSQL = append(tbSQL, ctStmt) - } - entry := &common.DumpEntry{ - TbSQL: tbSQL, - TotalCount: tb.Counter, - } - atomic.AddInt64(&e.mysqlContext.RowsEstimate, 1) - atomic.AddInt64(&e.TotalRowsCopied, 1) - if err := e.encodeAndSendDumpEntry(entry); err != nil { - return errors.Wrap(err, "encodeAndSendDumpEntry. create table") + entry.TbSQL = append(entry.TbSQL, ctStmt) } } - e.tableCount += len(db.TableMap) + if err := e.encodeAndSendDumpEntry(entry); err != nil { + return errors.Wrap(err, "encodeAndSendDumpEntry. create table") + } } + e.tableCount += len(db.TableMap) } step++