Skip to content

Commit

Permalink
break out of infinite retry for submitted ops
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Shorsher <[email protected]>
  • Loading branch information
shorsher committed Oct 30, 2024
1 parent 1939b67 commit f968a81
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 5 deletions.
10 changes: 8 additions & 2 deletions internal/blockchain/ethereum/ethereum.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ type Ethereum struct {
ethconnectConf config.Section
subs common.FireflySubscriptions
cache cache.CInterface
count int
}

type eventStreamWebsocket struct {
Expand Down Expand Up @@ -147,6 +148,7 @@ func (e *Ethereum) Init(ctx context.Context, cancelCtx context.CancelFunc, conf
e.capabilities = &blockchain.Capabilities{}
e.callbacks = common.NewBlockchainCallbacks()
e.subs = common.NewFireflySubscriptions()
e.count = 0

if addressResolverConf.GetString(AddressResolverURLTemplate) != "" {
// Check if we need to invoke the address resolver (without caching) on every call
Expand Down Expand Up @@ -623,8 +625,12 @@ func (e *Ethereum) invokeContractMethod(ctx context.Context, address, signingKey
SetBody(body).
SetError(&resErr).
Post("/")
if err != nil || !res.IsSuccess() {
return resErr.SubmissionRejected, common.WrapRESTError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr)
log.L(ctx).Infof("count %d", e.count)
if err != nil || !res.IsSuccess() || e.count == 0 {
e.count = 1
wrapErr := common.WrapRESTError(ctx, &resErr, res, err, coremsgs.MsgEthConnectorRESTErr)
log.L(ctx).Errorf("forced ethereum invokeContractMethod: %s", wrapErr)
return resErr.SubmissionRejected, wrapErr
}
return false, nil
}
Expand Down
12 changes: 9 additions & 3 deletions internal/operations/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared
// So this is safe
failState = core.OpStatusPending
log.L(ctx).Infof("Setting operation %s operation %s status to %s after conflict", op.Type, op.ID, failState)
// clear error so we're not infinitely retrying
err = nil
case phase == core.OpPhaseInitializing && idempotentSubmit:
// We haven't submitted the operation yet - so we will reuse the operation if the user retires with the same idempotency key
failState = core.OpStatusInitialized
Expand All @@ -193,13 +195,17 @@ func (om *operationsManager) RunOperation(ctx context.Context, op *core.Prepared
// Ok, we're failed
failState = core.OpStatusFailed
}
om.SubmitOperationUpdate(&core.OperationUpdate{
opUpdate := &core.OperationUpdate{
NamespacedOpID: op.NamespacedIDString(),
Plugin: op.Plugin,
Status: failState,
ErrorMessage: err.Error(),
Output: outputs,
})
}
if err != nil {
opUpdate.ErrorMessage = err.Error()
}

om.SubmitOperationUpdate(opUpdate)
} else {
// No error so move us from "Initialized" to "Pending"
newState := core.OpStatusPending
Expand Down

0 comments on commit f968a81

Please sign in to comment.