Skip to content

Commit

Permalink
api: save dump progress on consul
Browse files Browse the repository at this point in the history
extractor: send dump entry to applier anyway for row numbers
  • Loading branch information
ffffwh committed Jan 10, 2023
1 parent 1906d6e commit e43a231
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 54 deletions.
2 changes: 1 addition & 1 deletion api/handler/v2/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -754,7 +754,7 @@ func getTaskProgress(logger g.LoggerType, header http.Header, taskLogs []models.
return nil, 0, ""
}

execRowCount, totalRowCount, err := storeManager.GetFullProgress(jobId)
execRowCount, totalRowCount, err := storeManager.GetDumpProgress(jobId)
progress := &models.DumpProgress{
ExecRowCount: execRowCount,
TotalRowCount: totalRowCount,
Expand Down
37 changes: 31 additions & 6 deletions driver/common/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,12 +621,6 @@ func (sm *StoreManager) WatchTree(dir string, stopCh <-chan struct{}) (<-chan []
return sm.consulStore.WatchTree(dir, stopCh)
}

// return: ExecRowCount, TotalRowCount
func (sm *StoreManager) GetFullProgress(jobName string) (int64, int64, error) {
// TODO
return 42, 42, nil
}

func (sm *StoreManager) PutJobStage(jobName string, stage string) error {
key := fmt.Sprintf("dtle/%v/JobStage", jobName)
return sm.consulStore.Put(key, []byte(stage), nil)
Expand All @@ -644,6 +638,37 @@ func (sm *StoreManager) GetJobStage(jobName string) (string, error) {
return string(kv.Value), nil
}

func (sm *StoreManager) PutDumpProgress(jobName string, exec int64, total int64) error {
key := fmt.Sprintf("dtle/%v/DumpProgress", jobName)
bs, err := json.Marshal([]int64{exec, total})
if err != nil {
return err
}
return sm.consulStore.Put(key, bs, nil)
}

// return: ExecRowCount, TotalRowCount
func (sm *StoreManager) GetDumpProgress(jobName string) (int64, int64, error) {
key := fmt.Sprintf("dtle/%v/DumpProgress", jobName)
kv, err:= sm.consulStore.Get(key)
if err == store.ErrKeyNotFound {
return 0, 0, nil
} else if err != nil {
return 0, 0, err
}

var r []int64
err = json.Unmarshal(kv.Value, &r)
if err != nil {
return 0, 0, err
}
if len(r) != 2 {
return 0, 0, fmt.Errorf("unexpected len for %v. found %v", key, len(r))
}

return r[0], r[1], nil
}

// consul store item

func NewDefaultRole(tenant string) *Role {
Expand Down
21 changes: 21 additions & 0 deletions driver/mysql/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ func (a *Applier) Run() {
}
a.ai.OnError = a.onError

go a.updateDumpProgressLoop()
if sourceType == "mysql" {
go a.updateGtidLoop()
}
Expand Down Expand Up @@ -1187,3 +1188,23 @@ func (a *Applier) enableForeignKeyChecks() error {
}
return nil
}

func (a *Applier) updateDumpProgressLoop() {
var err error
interval := 10
a.logger.Debug("updateDumpProgressLoop", "interval", interval)

for {
if a.shutdown {
return
}

err = a.storeManager.PutDumpProgress(a.subject, a.TotalRowsReplayed, a.mysqlContext.RowsEstimate)
if err != nil {
a.onError(common.TaskStateDead, err)
return
}

time.Sleep(time.Duration(interval) * time.Second)
}
}
85 changes: 38 additions & 47 deletions driver/mysql/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,51 +1303,48 @@ func (e *Extractor) mysqlDump() (retErr error) {

e.gotCoordinateCh <- struct{}{}

// Transform the current schema so that it reflects the *current* state of the MySQL server's contents.
// First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ...
if !e.mysqlContext.SkipCreateDbTable {
e.logger.Info("generating DROP and CREATE statements to reflect current database schemas",
"replicateDoDb", e.replicateDoDb)

for _, db := range e.replicateDoDb {
var dbSQL string
if strings.ToLower(db.TableSchema) != "mysql" {
if db.TableSchemaRename != "" {
dbSQL, err = base.RenameCreateSchemaAddINE(db.CreateSchemaString, db.TableSchemaRename)
if err != nil {
return errors.Wrap(err, "RenameCreateSchemaAddINE")
}
} else {
dbSQL = db.CreateSchemaString
// Go through all tables to get DDL and row numbers.
for _, db := range e.replicateDoDb {
if strings.ToLower(db.TableSchema) == "mysql" {
continue
}

// Create the schema.
entry := &common.DumpEntry{}
if !e.mysqlContext.SkipCreateDbTable {
if db.TableSchemaRename != "" {
entry.DbSQL, err = base.RenameCreateSchemaAddINE(db.CreateSchemaString, db.TableSchemaRename)
if err != nil {
return errors.Wrap(err, "RenameCreateSchemaAddINE")
}
} else {
entry.DbSQL = db.CreateSchemaString
}
}
if err := e.encodeAndSendDumpEntry(entry); err != nil {
return errors.Wrap(err, "encodeAndSendDumpEntry. create schema entry")
}

// Create the tables.
for _, tbCtx := range db.TableMap {
tb := tbCtx.Table
tb.Counter, err = e.CountTableRows(tb)
if err != nil {
return errors.Wrapf(err, "CountTableRows %v.%v", tb.TableSchema, tb.TableName)
}
e.logger.Info("count table", "schema", db.TableSchema, "table", tb.TableName, "rows", tb.Counter)

entry := &common.DumpEntry{
DbSQL: dbSQL,
}
atomic.AddInt64(&e.mysqlContext.RowsEstimate, 1)
atomic.AddInt64(&e.TotalRowsCopied, 1)
if err := e.encodeAndSendDumpEntry(entry); err != nil {
return errors.Wrap(err, "encodeAndSendDumpEntry. create schema entry")
TbSQL: []string{},
TotalCount: tb.Counter,
}

for _, tbCtx := range db.TableMap {
tb := tbCtx.Table
if tb.TableSchema != db.TableSchema {
continue
}
total, err := e.CountTableRows(tb)
if err != nil {
return errors.Wrapf(err, "CountTableRows %v.%v", tb.TableSchema, tb.TableName)
}
tb.Counter = total
var tbSQL []string
if !e.mysqlContext.SkipCreateDbTable {
if strings.ToLower(tb.TableType) == "view" {
/*tbSQL, err = base.ShowCreateView(e.singletonDB, tb.TableSchema, tb.TableName, e.mysqlContext.DropTableIfExists)
if err != nil {
return err
}*/
} else if strings.ToLower(tb.TableSchema) != "mysql" {
} else {
ctStmt, err := base.ShowCreateTable(e.singletonDB, tb.TableSchema, tb.TableName)
if err != nil {
return err
Expand All @@ -1362,23 +1359,17 @@ func (e *Extractor) mysqlDump() (retErr error) {
}

if e.mysqlContext.DropTableIfExists {
tbSQL = append(tbSQL, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
entry.TbSQL = append(entry.TbSQL, fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
mysqlconfig.EscapeName(targetSchema), mysqlconfig.EscapeName(targetTable)))
}
tbSQL = append(tbSQL, ctStmt)
}
entry := &common.DumpEntry{
TbSQL: tbSQL,
TotalCount: tb.Counter,
}
atomic.AddInt64(&e.mysqlContext.RowsEstimate, 1)
atomic.AddInt64(&e.TotalRowsCopied, 1)
if err := e.encodeAndSendDumpEntry(entry); err != nil {
return errors.Wrap(err, "encodeAndSendDumpEntry. create table")
entry.TbSQL = append(entry.TbSQL, ctStmt)
}
}
e.tableCount += len(db.TableMap)
if err := e.encodeAndSendDumpEntry(entry); err != nil {
return errors.Wrap(err, "encodeAndSendDumpEntry. create table")
}
}
e.tableCount += len(db.TableMap)
}
step++

Expand Down

0 comments on commit e43a231

Please sign in to comment.