diff --git a/pkg/rm/tcc/fence/fence_api.go b/pkg/rm/tcc/fence/fence_api.go index 501a3733c..6e7967fe8 100644 --- a/pkg/rm/tcc/fence/fence_api.go +++ b/pkg/rm/tcc/fence/fence_api.go @@ -29,10 +29,11 @@ import ( // WithFence Execute the fence database operation first and then call back the business method func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err error) { - if err = DoFence(ctx, tx); err != nil { + if skip, err := DoFence(ctx, tx); err != nil { return err + } else if skip { + return nil } - if err := callback(); err != nil { return fmt.Errorf("the business method error msg of: %p, [%w]", callback, err) } @@ -47,20 +48,20 @@ func WithFence(ctx context.Context, tx *sql.Tx, callback func() error) (err erro // case 3: if fencePhase is FencePhaseCommit, will do commit fence operation. // case 4: if fencePhase is FencePhaseRollback, will do rollback fence operation. // case 5: if fencePhase not in above case, will return a fence phase illegal error. -func DoFence(ctx context.Context, tx *sql.Tx) error { +func DoFence(ctx context.Context, tx *sql.Tx) (bool, error) { hd := handler.GetFenceHandler() phase := tm.GetFencePhase(ctx) switch phase { case enum.FencePhaseNotExist: - return fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx)) + return false, fmt.Errorf("xid %s, tx name %s, fence phase not exist", tm.GetXID(ctx), tm.GetTxName(ctx)) case enum.FencePhasePrepare: - return hd.PrepareFence(ctx, tx) + return false, hd.PrepareFence(ctx, tx) case enum.FencePhaseCommit: - return hd.CommitFence(ctx, tx) + return false, hd.CommitFence(ctx, tx) case enum.FencePhaseRollback: return hd.RollbackFence(ctx, tx) } - return fmt.Errorf("fence phase: %v illegal", phase) + return false, fmt.Errorf("fence phase: %v illegal", phase) } diff --git a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go index d719f01e1..96280a195 100644 --- a/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go +++ b/pkg/rm/tcc/fence/handler/tcc_fence_wrapper_handler.go @@ -110,37 +110,37 @@ func (handler *tccFenceWrapperHandler) CommitFence(ctx context.Context, tx *sql. return handler.updateFenceStatus(tx, xid, branchId, enum.StatusCommitted) } -func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) error { +func (handler *tccFenceWrapperHandler) RollbackFence(ctx context.Context, tx *sql.Tx) (bool, error) { xid := tm.GetBusinessActionContext(ctx).Xid branchId := tm.GetBusinessActionContext(ctx).BranchId actionName := tm.GetBusinessActionContext(ctx).ActionName fenceDo, err := handler.tccFenceDao.QueryTCCFenceDO(tx, xid, branchId) if err != nil { - return fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) + return false, fmt.Errorf("rollback fence method failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } // record is null, mean the need suspend if fenceDo == nil { err = handler.insertTCCFenceLog(tx, xid, branchId, actionName, enum.StatusSuspended) if err != nil { - return fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) + return false, fmt.Errorf("insert tcc fence record errors, rollback fence failed. xid= %s, branchId= %d, [%w]", xid, branchId, err) } log.Infof("Insert tcc fence suspend record xid: %s, branchId: %d", xid, branchId) - return nil + return true, tx.Commit() } // have rollbacked or suspended if fenceDo.Status == enum.StatusRollbacked || fenceDo.Status == enum.StatusSuspended { // enable warn level log.Infof("Branch transaction had already rollbacked before, idempotency rejected. xid: %s, branchId: %d, status: %s", xid, branchId, fenceDo.Status) - return nil + return true, tx.Commit() } if fenceDo.Status == enum.StatusCommitted { log.Warnf("Branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) - return fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) + return false, fmt.Errorf("branch transaction status is unexpected. xid: %s, branchId: %d, status: %d", xid, branchId, fenceDo.Status) } - return handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked) + return false, handler.updateFenceStatus(tx, xid, branchId, enum.StatusRollbacked) } func (handler *tccFenceWrapperHandler) insertTCCFenceLog(tx *sql.Tx, xid string, branchId int64, actionName string, status enum.FenceStatus) error { diff --git a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go index 5da7ad7a6..7bf75cd5a 100644 --- a/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go +++ b/pkg/rm/tcc/fence/store/db/dao/tcc_fence_db.go @@ -75,7 +75,7 @@ func (t *TccFenceStoreDatabaseMapper) QueryTCCFenceDO(tx *sql.Tx, xid string, br if err = result.Scan(&xid, &branchId, &actionName, &status, &gmtCreate, &gmtModify); err != nil { // will return error, if rows is empty if err.Error() == "sql: no rows in result set" { - return nil, fmt.Errorf("query tcc fence get scan row,no rows in result set, [%w]", err) + return nil, nil } else { return nil, fmt.Errorf("query tcc fence get scan row failed, [%w]", err) }