Skip to content

Commit

Permalink
backport of hyperledger#1595
Browse files Browse the repository at this point in the history
 - don't infinitely retry for submitted operations
  • Loading branch information
shorsher committed Nov 4, 2024
1 parent 6625d24 commit b2f48cc
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 10 deletions.
10 changes: 9 additions & 1 deletion internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,15 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error {
// Call the dispatcher to do the heavy lifting - will only exit if we're closed
return operations.RunWithOperationContext(bp.ctx, func(ctx context.Context) error {
return bp.retry.Do(ctx, "batch dispatch", func(attempt int) (retry bool, err error) {
return true, bp.conf.dispatch(ctx, payload)
err = bp.conf.dispatch(ctx, payload)
if err != nil {
conflictErr, conflictTestOk := err.(operations.ConflictError)
if conflictTestOk && conflictErr.IsConflictError() {
// We know that the connector has received our batch, so we shouldn't need to retry
return true, nil
}
}
return true, err
})
})
}
Expand Down
34 changes: 34 additions & 0 deletions internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
Expand Down Expand Up @@ -88,6 +91,24 @@ type BlockchainReceiptNotification struct {
ContractLocation *fftypes.JSONAny `json:"contractLocation,omitempty"`
}

type BlockchainRESTError struct {
Error string `json:"error,omitempty"`
// See https://github.com/hyperledger/firefly-transaction-manager/blob/main/pkg/ffcapi/submission_error.go
SubmissionRejected bool `json:"submissionRejected,omitempty"`
}

type conflictError struct {
err error
}

func (ce *conflictError) Error() string {
return ce.err.Error()
}

func (ce *conflictError) IsConflictError() bool {
return true
}

func NewBlockchainCallbacks() BlockchainCallbacks {
return &callbacks{
handlers: make(map[string]blockchain.Callbacks),
Expand Down Expand Up @@ -337,3 +358,16 @@ func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainRece

return nil
}

func WrapRestError(ctx context.Context, errRes *BlockchainRESTError, res *resty.Response, err error, defMsgKey i18n.ErrorMessageKey) error {
if errRes != nil && errRes.Error != "" {
if res != nil && res.StatusCode() == http.StatusConflict {
return &conflictError{err: i18n.WrapError(ctx, err, coremsgs.MsgBlockchainConnectorRESTErrConflict, errRes.Error)}
}
return i18n.WrapError(ctx, err, defMsgKey, errRes.Error)
}
if res != nil && res.StatusCode() == http.StatusConflict {
return &conflictError{err: ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgBlockchainConnectorRESTErrConflict)}
}
return ffresty.WrapRestErr(ctx, res, err, defMsgKey)
}
4 changes: 2 additions & 2 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,14 +574,14 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey
if err != nil {
return err
}
var resErr ethError
var resErr common.BlockchainRESTError
res, err := e.client.R().
SetContext(ctx).
SetBody(body).
SetError(&resErr).
Post("/")
if err != nil || !res.IsSuccess() {
return wrapError(ctx, &resErr, res, err)
return common.WrapRestError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr)
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,4 +284,5 @@ var (
MsgOperationNotFoundInTransaction = ffe("FF10444", "No operation of type %s was found in transaction '%s'")
MsgCannotSetParameterWithMessage = ffe("FF10445", "Cannot provide a value for '%s' when pinning a message", 400)
MsgNamespaceNotStarted = ffe("FF10446", "Namespace '%s' is not started", 412)
MsgBlockchainConnectorRESTErrConflict = ffe("FF10447", "Conflict from blockchain connector: %s", 409)
)
30 changes: 23 additions & 7 deletions internal/operations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ const (
RemainPendingOnFailure RunOperationOption = iota
)

// ConflictError can be implemented by connectors to prevent an operation being overridden to failed
type ConflictError interface {
IsConflictError() bool
}

type operationsManager struct {
ctx context.Context
namespace string
Expand Down Expand Up @@ -152,13 +157,24 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared
log.L(ctx).Tracef("Operation detail: %+v", op)
outputs, complete, err := handler.RunOperation(ctx, op)
if err != nil {
om.SubmitOperationUpdate(&core.OperationUpdate{
NamespacedOpID: op.NamespacedIDString(),
Plugin: op.Plugin,
Status: failState,
ErrorMessage: err.Error(),
Output: outputs,
})
conflictErr, ok := err.(ConflictError)
if ok && conflictErr.IsConflictError() {
// We are now pending - we know the connector has the action we're attempting to submit
//
// The async processing in SubmitOperationUpdate does not allow us to go back to pending, if
// we have progressed to failed through an async event that gets ordered before this update.
// So this is safe
failState = core.OpStatusPending
log.L(ctx).Infof("Setting operation %s operation %s status to %s after conflict", op.Type, op.ID, failState)
} else {
om.SubmitOperationUpdate(&core.OperationUpdate{
NamespacedOpID: op.NamespacedIDString(),
Plugin: op.Plugin,
Status: failState,
ErrorMessage: err.Error(),
Output: outputs,
})
}
} else {
// No error so move us from "Initialized" to "Pending"
newState := core.OpStatusPending
Expand Down
6 changes: 6 additions & 0 deletions internal/operations/operation_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *c
}

if ou.conf.workerCount > 0 {
if update.Status == core.OpStatusFailed {
// We do a cache update pre-emptively, as for idempotency checking on an error status we want to
// see the update immediately - even though it's being asynchronously flushed to the storage
ou.manager.updateCachedOperation(id, update.Status, &update.ErrorMessage, update.Output, nil)
}

select {
case ou.pickWorker(ctx, id, update) <- update:
case <-ou.ctx.Done():
Expand Down

0 comments on commit b2f48cc

Please sign in to comment.