Skip to content

Commit

Permalink
backport of hyperledger#1595
Browse files Browse the repository at this point in the history
 - don't infinitely retry for submitted operations
  • Loading branch information
shorsher committed Nov 4, 2024
1 parent cac029c commit 552ad1e
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 10 deletions.
10 changes: 9 additions & 1 deletion internal/batch/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,15 @@ func (bp *batchProcessor) dispatchBatch(state *DispatchState) error {
// Call the dispatcher to do the heavy lifting - will only exit if we're closed
return operations.RunWithOperationContext(bp.ctx, func(ctx context.Context) error {
return bp.retry.Do(ctx, "batch dispatch", func(attempt int) (retry bool, err error) {
return true, bp.conf.dispatch(ctx, state)
err = bp.conf.dispatch(ctx, state)
if err != nil {
conflictErr, conflictTestOk := err.(operations.ConflictError)
if conflictTestOk && conflictErr.IsConflictError() {
// We know that the connector has received our batch, so we shouldn't need to retry
return true, nil
}
}
return true, err
})
})
}
Expand Down
34 changes: 34 additions & 0 deletions internal/blockchain/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"net/http"
"strings"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/ffresty"
"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/i18n"
"github.com/hyperledger/firefly-common/pkg/log"
Expand Down Expand Up @@ -84,6 +87,24 @@ type BlockchainReceiptNotification struct {
ProtocolID string `json:"protocolId,omitempty"`
}

type BlockchainRESTError struct {
Error string `json:"error,omitempty"`
// See https://github.com/hyperledger/firefly-transaction-manager/blob/main/pkg/ffcapi/submission_error.go
SubmissionRejected bool `json:"submissionRejected,omitempty"`
}

type conflictError struct {
err error
}

func (ce *conflictError) Error() string {
return ce.err.Error()
}

func (ce *conflictError) IsConflictError() bool {
return true
}

func NewBlockchainCallbacks() BlockchainCallbacks {
return &callbacks{
handlers: make(map[string]blockchain.Callbacks),
Expand Down Expand Up @@ -320,3 +341,16 @@ func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainRece

return nil
}

func WrapRestError(ctx context.Context, errRes *BlockchainRESTError, res *resty.Response, err error, defMsgKey i18n.ErrorMessageKey) error {
if errRes != nil && errRes.Error != "" {
if res != nil && res.StatusCode() == http.StatusConflict {
return &conflictError{err: i18n.WrapError(ctx, err, coremsgs.MsgBlockchainConnectorRESTErrConflict, errRes.Error)}
}
return i18n.WrapError(ctx, err, defMsgKey, errRes.Error)
}
if res != nil && res.StatusCode() == http.StatusConflict {
return &conflictError{err: ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgBlockchainConnectorRESTErrConflict)}
}
return ffresty.WrapRestErr(ctx, res, err, defMsgKey)
}
4 changes: 2 additions & 2 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,14 +566,14 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey
if err != nil {
return err
}
var resErr ethError
var resErr common.BlockchainRESTError
res, err := e.client.R().
SetContext(ctx).
SetBody(body).
SetError(&resErr).
Post("/")
if err != nil || !res.IsSuccess() {
return wrapError(ctx, &resErr, res, err)
return common.WrapRestError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr)
}
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,12 @@ var (
MsgMissingNamespace = ffe("FF10437", "Missing namespace in request", 400)
MsgDeprecatedResetWithAutoReload = ffe("FF10438", "The deprecated reset API cannot be used when dynamic config reload is enabled", 409)
MsgConfigArrayVsRawConfigMismatch = ffe("FF10439", "Error processing configuration - mismatch between raw and processed array lengths")
MsgDefaultChannelNotConfigured = ffe("FF10440", "No default channel configured for this namespace", 400)
MsgNamespaceInitializing = ffe("FF10441", "Namespace '%s' is initializing", 412)
MsgPinsNotAssigned = ffe("FF10442", "Message cannot be sent because pins have not been assigned")
MsgMethodDoesNotSupportPinning = ffe("FF10443", "This method does not support passing a payload for pinning")
MsgOperationNotFoundInTransaction = ffe("FF10444", "No operation of type %s was found in transaction '%s'")
MsgCannotSetParameterWithMessage = ffe("FF10445", "Cannot provide a value for '%s' when pinning a message", 400)
MsgNamespaceNotStarted = ffe("FF10446", "Namespace '%s' is not started", 412)
MsgBlockchainConnectorRESTErrConflict = ffe("FF10447", "Conflict from blockchain connector: %s", 409)
)
31 changes: 24 additions & 7 deletions internal/operations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ const (
RemainPendingOnFailure RunOperationOption = iota
)

// ConflictError can be implemented by connectors to prevent an operation being overridden to failed
type ConflictError interface {
IsConflictError() bool
}

type operationsManager struct {
ctx context.Context
namespace string
Expand Down Expand Up @@ -127,14 +132,26 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared
log.L(ctx).Tracef("Operation detail: %+v", op)
outputs, complete, err := handler.RunOperation(ctx, op)
if err != nil {
om.SubmitOperationUpdate(&core.OperationUpdate{
NamespacedOpID: op.NamespacedIDString(),
Plugin: op.Plugin,
Status: failState,
ErrorMessage: err.Error(),
Output: outputs,
})
conflictErr, ok := err.(ConflictError)
if ok && conflictErr.IsConflictError() {
// We are now pending - we know the connector has the action we're attempting to submit
//
// The async processing in SubmitOperationUpdate does not allow us to go back to pending, if
// we have progressed to failed through an async event that gets ordered before this update.
// So this is safe
failState = core.OpStatusPending
log.L(ctx).Infof("Setting operation %s operation %s status to %s after conflict", op.Type, op.ID, failState)
} else {
om.SubmitOperationUpdate(&core.OperationUpdate{
NamespacedOpID: op.NamespacedIDString(),
Plugin: op.Plugin,
Status: failState,
ErrorMessage: err.Error(),
Output: outputs,
})
}
} else if complete {

om.SubmitOperationUpdate(&core.OperationUpdate{
NamespacedOpID: op.NamespacedIDString(),
Plugin: op.Plugin,
Expand Down
6 changes: 6 additions & 0 deletions internal/operations/operation_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *c
}

if ou.conf.workerCount > 0 {
if update.Status == core.OpStatusFailed {
// We do a cache update pre-emptively, as for idempotency checking on an error status we want to
// see the update immediately - even though it's being asynchronously flushed to the storage
ou.manager.updateCachedOperation(id, update.Status, &update.ErrorMessage, update.Output, nil)
}

select {
case ou.pickWorker(ctx, id, update) <- update:
case <-ou.ctx.Done():
Expand Down

0 comments on commit 552ad1e

Please sign in to comment.