diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index ee274850d..c1c4ac12c 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -556,7 +556,15 @@ func (bp *batchProcessor) dispatchBatch(payload *DispatchPayload) error { // Call the dispatcher to do the heavy lifting - will only exit if we're closed return operations.RunWithOperationContext(bp.ctx, func(ctx context.Context) error { return bp.retry.Do(ctx, "batch dispatch", func(attempt int) (retry bool, err error) { - return true, bp.conf.dispatch(ctx, payload) + err = bp.conf.dispatch(ctx, payload) + if err != nil { + conflictErr, conflictTestOk := err.(operations.ConflictError) + if conflictTestOk && conflictErr.IsConflictError() { + // We know that the connector has received our batch, so we shouldn't need to retry + return true, nil + } + } + return true, err }) }) } diff --git a/internal/blockchain/common/common.go b/internal/blockchain/common/common.go index b692bcfc1..44204a429 100644 --- a/internal/blockchain/common/common.go +++ b/internal/blockchain/common/common.go @@ -21,9 +21,12 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net/http" "strings" "sync" + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" @@ -88,6 +91,24 @@ type BlockchainReceiptNotification struct { ContractLocation *fftypes.JSONAny `json:"contractLocation,omitempty"` } +type BlockchainRESTError struct { + Error string `json:"error,omitempty"` + // See https://github.com/hyperledger/firefly-transaction-manager/blob/main/pkg/ffcapi/submission_error.go + SubmissionRejected bool `json:"submissionRejected,omitempty"` +} + +type conflictError struct { + err error +} + +func (ce *conflictError) Error() string { + return ce.err.Error() +} + +func (ce *conflictError) IsConflictError() bool { + return true +} + func NewBlockchainCallbacks() BlockchainCallbacks { return &callbacks{ handlers: make(map[string]blockchain.Callbacks), @@ -337,3 +358,16 @@ func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainRece return nil } + +func WrapRestError(ctx context.Context, errRes *BlockchainRESTError, res *resty.Response, err error, defMsgKey i18n.ErrorMessageKey) error { + if errRes != nil && errRes.Error != "" { + if res != nil && res.StatusCode() == http.StatusConflict { + return &conflictError{err: i18n.WrapError(ctx, err, coremsgs.MsgBlockchainConnectorRESTErrConflict, errRes.Error)} + } + return i18n.WrapError(ctx, err, defMsgKey, errRes.Error) + } + if res != nil && res.StatusCode() == http.StatusConflict { + return &conflictError{err: ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgBlockchainConnectorRESTErrConflict)} + } + return ffresty.WrapRestErr(ctx, res, err, defMsgKey) +} diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index 6c8d8a334..330e2f8c8 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -574,14 +574,14 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey if err != nil { return err } - var resErr ethError + var resErr common.BlockchainRESTError res, err := e.client.R(). SetContext(ctx). SetBody(body). SetError(&resErr). Post("/") if err != nil || !res.IsSuccess() { - return wrapError(ctx, &resErr, res, err) + return common.WrapRestError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr) } return nil } diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index 8f788235c..81cdb8d94 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -284,4 +284,5 @@ var ( MsgOperationNotFoundInTransaction = ffe("FF10444", "No operation of type %s was found in transaction '%s'") MsgCannotSetParameterWithMessage = ffe("FF10445", "Cannot provide a value for '%s' when pinning a message", 400) MsgNamespaceNotStarted = ffe("FF10446", "Namespace '%s' is not started", 412) + MsgBlockchainConnectorRESTErrConflict = ffe("FF10447", "Conflict from blockchain connector: %s", 409) ) diff --git a/internal/operations/manager.go b/internal/operations/manager.go index ad0e611fe..e49f80314 100644 --- a/internal/operations/manager.go +++ b/internal/operations/manager.go @@ -59,6 +59,11 @@ const ( RemainPendingOnFailure RunOperationOption = iota ) +// ConflictError can be implemented by connectors to prevent an operation being overridden to failed +type ConflictError interface { + IsConflictError() bool +} + type operationsManager struct { ctx context.Context namespace string @@ -152,13 +157,24 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared log.L(ctx).Tracef("Operation detail: %+v", op) outputs, complete, err := handler.RunOperation(ctx, op) if err != nil { - om.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: op.NamespacedIDString(), - Plugin: op.Plugin, - Status: failState, - ErrorMessage: err.Error(), - Output: outputs, - }) + conflictErr, ok := err.(ConflictError) + if ok && conflictErr.IsConflictError() { + // We are now pending - we know the connector has the action we're attempting to submit + // + // The async processing in SubmitOperationUpdate does not allow us to go back to pending, if + // we have progressed to failed through an async event that gets ordered before this update. + // So this is safe + failState = core.OpStatusPending + log.L(ctx).Infof("Setting operation %s operation %s status to %s after conflict", op.Type, op.ID, failState) + } else { + om.SubmitOperationUpdate(&core.OperationUpdate{ + NamespacedOpID: op.NamespacedIDString(), + Plugin: op.Plugin, + Status: failState, + ErrorMessage: err.Error(), + Output: outputs, + }) + } } else { // No error so move us from "Initialized" to "Pending" newState := core.OpStatusPending diff --git a/internal/operations/operation_updater.go b/internal/operations/operation_updater.go index 0365d8fff..9f3b6d3a6 100644 --- a/internal/operations/operation_updater.go +++ b/internal/operations/operation_updater.go @@ -105,6 +105,12 @@ func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *c } if ou.conf.workerCount > 0 { + if update.Status == core.OpStatusFailed { + // We do a cache update pre-emptively, as for idempotency checking on an error status we want to + // see the update immediately - even though it's being asynchronously flushed to the storage + ou.manager.updateCachedOperation(id, update.Status, &update.ErrorMessage, update.Output, nil) + } + select { case ou.pickWorker(ctx, id, update) <- update: case <-ou.ctx.Done():