diff --git a/internal/batch/batch_processor.go b/internal/batch/batch_processor.go index 2887b8745..7d396b1ce 100644 --- a/internal/batch/batch_processor.go +++ b/internal/batch/batch_processor.go @@ -600,7 +600,15 @@ func (bp *batchProcessor) dispatchBatch(state *DispatchState) error { // Call the dispatcher to do the heavy lifting - will only exit if we're closed return operations.RunWithOperationContext(bp.ctx, func(ctx context.Context) error { return bp.retry.Do(ctx, "batch dispatch", func(attempt int) (retry bool, err error) { - return true, bp.conf.dispatch(ctx, state) + err = bp.conf.dispatch(ctx, state) + if err != nil { + conflictErr, conflictTestOk := err.(operations.ConflictError) + if conflictTestOk && conflictErr.IsConflictError() { + // We know that the connector has received our batch, so we shouldn't need to retry + return true, nil + } + } + return true, err }) }) } diff --git a/internal/blockchain/common/common.go b/internal/blockchain/common/common.go index 0da135b7d..f5181681d 100644 --- a/internal/blockchain/common/common.go +++ b/internal/blockchain/common/common.go @@ -21,8 +21,11 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net/http" "strings" + "github.com/go-resty/resty/v2" + "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-common/pkg/i18n" "github.com/hyperledger/firefly-common/pkg/log" @@ -84,6 +87,24 @@ type BlockchainReceiptNotification struct { ProtocolID string `json:"protocolId,omitempty"` } +type BlockchainRESTError struct { + Error string `json:"error,omitempty"` + // See https://github.com/hyperledger/firefly-transaction-manager/blob/main/pkg/ffcapi/submission_error.go + SubmissionRejected bool `json:"submissionRejected,omitempty"` +} + +type conflictError struct { + err error +} + +func (ce *conflictError) Error() string { + return ce.err.Error() +} + +func (ce *conflictError) IsConflictError() bool { + return true +} + func NewBlockchainCallbacks() BlockchainCallbacks { return &callbacks{ handlers: make(map[string]blockchain.Callbacks), @@ -320,3 +341,16 @@ func HandleReceipt(ctx context.Context, plugin core.Named, reply *BlockchainRece return nil } + +func WrapRestError(ctx context.Context, errRes *BlockchainRESTError, res *resty.Response, err error, defMsgKey i18n.ErrorMessageKey) error { + if errRes != nil && errRes.Error != "" { + if res != nil && res.StatusCode() == http.StatusConflict { + return &conflictError{err: i18n.WrapError(ctx, err, coremsgs.MsgBlockchainConnectorRESTErrConflict, errRes.Error)} + } + return i18n.WrapError(ctx, err, defMsgKey, errRes.Error) + } + if res != nil && res.StatusCode() == http.StatusConflict { + return &conflictError{err: ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgBlockchainConnectorRESTErrConflict)} + } + return ffresty.WrapRestErr(ctx, res, err, defMsgKey) +} diff --git a/internal/blockchain/ethereum/ethereum.go b/internal/blockchain/ethereum/ethereum.go index e0fcde5d8..b23d27fdd 100644 --- a/internal/blockchain/ethereum/ethereum.go +++ b/internal/blockchain/ethereum/ethereum.go @@ -566,14 +566,14 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey if err != nil { return err } - var resErr ethError + var resErr common.BlockchainRESTError res, err := e.client.R(). SetContext(ctx). SetBody(body). SetError(&resErr). Post("/") if err != nil || !res.IsSuccess() { - return wrapError(ctx, &resErr, res, err) + return common.WrapRestError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr) } return nil } diff --git a/internal/coremsgs/en_error_messages.go b/internal/coremsgs/en_error_messages.go index a704e98cb..fd85fa67c 100644 --- a/internal/coremsgs/en_error_messages.go +++ b/internal/coremsgs/en_error_messages.go @@ -278,4 +278,12 @@ var ( MsgMissingNamespace = ffe("FF10437", "Missing namespace in request", 400) MsgDeprecatedResetWithAutoReload = ffe("FF10438", "The deprecated reset API cannot be used when dynamic config reload is enabled", 409) MsgConfigArrayVsRawConfigMismatch = ffe("FF10439", "Error processing configuration - mismatch between raw and processed array lengths") + MsgDefaultChannelNotConfigured = ffe("FF10440", "No default channel configured for this namespace", 400) + MsgNamespaceInitializing = ffe("FF10441", "Namespace '%s' is initializing", 412) + MsgPinsNotAssigned = ffe("FF10442", "Message cannot be sent because pins have not been assigned") + MsgMethodDoesNotSupportPinning = ffe("FF10443", "This method does not support passing a payload for pinning") + MsgOperationNotFoundInTransaction = ffe("FF10444", "No operation of type %s was found in transaction '%s'") + MsgCannotSetParameterWithMessage = ffe("FF10445", "Cannot provide a value for '%s' when pinning a message", 400) + MsgNamespaceNotStarted = ffe("FF10446", "Namespace '%s' is not started", 412) + MsgBlockchainConnectorRESTErrConflict = ffe("FF10447", "Conflict from blockchain connector: %s", 409) ) diff --git a/internal/operations/manager.go b/internal/operations/manager.go index 700f204c2..ac260d4e7 100644 --- a/internal/operations/manager.go +++ b/internal/operations/manager.go @@ -58,6 +58,11 @@ const ( RemainPendingOnFailure RunOperationOption = iota ) +// ConflictError can be implemented by connectors to prevent an operation being overridden to failed +type ConflictError interface { + IsConflictError() bool +} + type operationsManager struct { ctx context.Context namespace string @@ -127,14 +132,26 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared log.L(ctx).Tracef("Operation detail: %+v", op) outputs, complete, err := handler.RunOperation(ctx, op) if err != nil { - om.SubmitOperationUpdate(&core.OperationUpdate{ - NamespacedOpID: op.NamespacedIDString(), - Plugin: op.Plugin, - Status: failState, - ErrorMessage: err.Error(), - Output: outputs, - }) + conflictErr, ok := err.(ConflictError) + if ok && conflictErr.IsConflictError() { + // We are now pending - we know the connector has the action we're attempting to submit + // + // The async processing in SubmitOperationUpdate does not allow us to go back to pending, if + // we have progressed to failed through an async event that gets ordered before this update. + // So this is safe + failState = core.OpStatusPending + log.L(ctx).Infof("Setting operation %s operation %s status to %s after conflict", op.Type, op.ID, failState) + } else { + om.SubmitOperationUpdate(&core.OperationUpdate{ + NamespacedOpID: op.NamespacedIDString(), + Plugin: op.Plugin, + Status: failState, + ErrorMessage: err.Error(), + Output: outputs, + }) + } } else if complete { + om.SubmitOperationUpdate(&core.OperationUpdate{ NamespacedOpID: op.NamespacedIDString(), Plugin: op.Plugin, diff --git a/internal/operations/operation_updater.go b/internal/operations/operation_updater.go index 5928eeec2..82677e94d 100644 --- a/internal/operations/operation_updater.go +++ b/internal/operations/operation_updater.go @@ -104,6 +104,12 @@ func (ou *operationUpdater) SubmitOperationUpdate(ctx context.Context, update *c } if ou.conf.workerCount > 0 { + if update.Status == core.OpStatusFailed { + // We do a cache update pre-emptively, as for idempotency checking on an error status we want to + // see the update immediately - even though it's being asynchronously flushed to the storage + ou.manager.updateCachedOperation(id, update.Status, &update.ErrorMessage, update.Output, nil) + } + select { case ou.pickWorker(ctx, id, update) <- update: case <-ou.ctx.Done():