Skip to content

Commit

Permalink
add some missing returns on error
Browse files Browse the repository at this point in the history
  • Loading branch information
ffffwh committed Jan 10, 2023
1 parent be0a3a2 commit 1906d6e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
3 changes: 3 additions & 0 deletions driver/mysql/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ func (a *Applier) Run() {
a.revExtractor, err = NewExtractor(execCtx2, &cfg2, a.logger, a.storeManager, a.waitCh, a.ctx)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "reversed Extractor"))
return
}
go a.revExtractor.Run()
}
Expand Down Expand Up @@ -364,11 +365,13 @@ func (a *Applier) Run() {
}).Marshal(nil)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Marshal"))
return
}
_, err = a.natsConn.Request(fmt.Sprintf("%s_bigtx_ack", a.subject),
bs, 1*time.Minute)
if err != nil {
a.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. Request"))
return
}
}
}
Expand Down
26 changes: 12 additions & 14 deletions driver/mysql/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ func (e *Extractor) Run() {
err = e.storeManager.SaveGtidForJob(e.subject, e.mysqlContext.Gtid)
if err != nil {
e.onError(common.TaskStateDead, err)
return
}
}

Expand Down Expand Up @@ -650,6 +651,7 @@ func (e *Extractor) initNatsPubClient(natsAddr string) (err error) {
err := e.natsConn.Publish(m.Reply, nil)
if err != nil {
e.onError(common.TaskStateDead, errors.Wrap(err, "bigtx_ack. reply"))
return
}

ack := &common.BigTxAck{}
Expand Down Expand Up @@ -1144,7 +1146,7 @@ func (e *Extractor) sendSysVarAndSqlMode() error {
}

//Perform the snapshot using the same logic as the "mysqldump" utility.
func (e *Extractor) mysqlDump() error {
func (e *Extractor) mysqlDump() (retErr error) {
defer e.singletonDB.Close()
var tx sql.QueryAble
var err error
Expand Down Expand Up @@ -1263,9 +1265,7 @@ func (e *Extractor) mysqlDump() error {
}
step++*/
e.logger.Info("Step: committing transaction", "n", step)
if err := realTx.Commit(); err != nil {
e.onError(common.TaskStateDead, err)
}
retErr = realTx.Commit()
}()
} else {
e.logger.Warn("Failed to get a consistenct TX with GTID. Will retry.", "gtidMatchRound", gtidMatchRound)
Expand Down Expand Up @@ -1315,7 +1315,7 @@ func (e *Extractor) mysqlDump() error {
if db.TableSchemaRename != "" {
dbSQL, err = base.RenameCreateSchemaAddINE(db.CreateSchemaString, db.TableSchemaRename)
if err != nil {
e.onError(common.TaskStateDead, err)
return errors.Wrap(err, "RenameCreateSchemaAddINE")
}
} else {
dbSQL = db.CreateSchemaString
Expand All @@ -1328,7 +1328,7 @@ func (e *Extractor) mysqlDump() error {
atomic.AddInt64(&e.mysqlContext.RowsEstimate, 1)
atomic.AddInt64(&e.TotalRowsCopied, 1)
if err := e.encodeAndSendDumpEntry(entry); err != nil {
e.onError(common.TaskStateDead, err)
return errors.Wrap(err, "encodeAndSendDumpEntry. create schema entry")
}

for _, tbCtx := range db.TableMap {
Expand All @@ -1338,7 +1338,7 @@ func (e *Extractor) mysqlDump() error {
}
total, err := e.CountTableRows(tb)
if err != nil {
return err
return errors.Wrapf(err, "CountTableRows %v.%v", tb.TableSchema, tb.TableName)
}
tb.Counter = total
var tbSQL []string
Expand Down Expand Up @@ -1374,7 +1374,7 @@ func (e *Extractor) mysqlDump() error {
atomic.AddInt64(&e.mysqlContext.RowsEstimate, 1)
atomic.AddInt64(&e.TotalRowsCopied, 1)
if err := e.encodeAndSendDumpEntry(entry); err != nil {
e.onError(common.TaskStateDead, err)
return errors.Wrap(err, "encodeAndSendDumpEntry. create table")
}
}
e.tableCount += len(db.TableMap)
Expand All @@ -1401,7 +1401,7 @@ func (e *Extractor) mysqlDump() error {
d := NewDumper(e.ctx, tx, t, e.mysqlContext.ChunkSize, e.logger.ResetNamed("dumper"), e.memory1,
e.mysqlContext.DumpEntryLimit)
if err := d.Dump(); err != nil {
e.onError(common.TaskStateDead, err)
return errors.Wrapf(err, "d.Dump %v.%v", t.TableSchema, t.TableName)
}
e.dumpers = append(e.dumpers, d)
// Scan the rows in the table ...
Expand All @@ -1410,22 +1410,20 @@ func (e *Extractor) mysqlDump() error {
if !d.sentTableDef {
tableBs, err := common.EncodeTable(d.Table)
if err != nil {
err = errors.Wrap(err, "full copy: EncodeTable")
e.onError(common.TaskStateDead, err)
return err
return errors.Wrap(err, "full copy: EncodeTable")
} else {
entry.Table = tableBs
d.sentTableDef = true
}
}
if err = e.encodeAndSendDumpEntry(entry); err != nil {
e.onError(common.TaskStateDead, err)
return errors.Wrap(err, "encodeAndSendDumpEntry. dump")
}
atomic.AddInt64(&e.TotalRowsCopied, int64(len(entry.ValuesX)))
atomic.AddInt64(d.Memory, -memSize)
}
if d.Err != nil {
e.onError(common.TaskStateDead, d.Err)
return errors.Wrap(err, "d.Err")
}
}
}
Expand Down

0 comments on commit 1906d6e

Please sign in to comment.